from __future__ import annotations
import asyncio
from datetime import datetime, timedelta, timezone
from pathlib import Path
from types import SimpleNamespace
import pytest
from beaver.engine import AgentLoop, EngineLoader
from beaver.engine.context import SkillContext
from beaver.engine.providers.base import LLMProvider, LLMResponse
from beaver.engine.providers.factory import ProviderBundle
from beaver.memory.runs import RunMemoryStore, RunRecord, SkillEffectRecord
from beaver.memory.skills import SkillLearningStore
from beaver.services.memory_service import MemoryService
from beaver.skills.assembler import SkillAssemblyResult
from beaver.skills.catalog.loader import SkillsLoader
from beaver.skills.drafts import DraftService
from beaver.skills.learning import EvidenceSelector, SkillLearningService
from beaver.skills.publisher import SkillPublisher
from beaver.skills.reviews import ReviewService
from beaver.skills.specs import SkillActivationReceipt, SkillSpecStore
class StubProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]) -> None:
super().__init__()
self._responses = list(responses)
self.calls: list[dict] = []
async def chat(
self,
messages: list[dict],
tools: list[dict] | None = None,
model: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
thinking_enabled: bool | None = None,
) -> LLMResponse:
self.calls.append(
{
"messages": messages,
"tools": tools,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
"thinking_enabled": thinking_enabled,
}
)
if not self._responses:
raise AssertionError("No stubbed provider responses left")
return self._responses.pop(0)
def get_default_model(self) -> str:
return "stub-model"
class StubSkillAssembler:
def __init__(self, activated_skills: list[SkillContext]) -> None:
self.activated_skills = activated_skills
self.calls: list[dict] = []
async def assemble(self, **kwargs) -> SkillAssemblyResult:
self.calls.append(kwargs)
return SkillAssemblyResult(activated_skills=list(self.activated_skills))
class RecordingToolAssembler:
def __init__(self) -> None:
self.calls: list[dict] = []
async def assemble(self, **kwargs):
self.calls.append(kwargs)
return kwargs["registry"].get_specs(["memory"])
def _tool_call(*, name: str = "echo", arguments: dict | None = None, call_id: str = "call-1") -> SimpleNamespace:
return SimpleNamespace(
id=call_id,
name=name,
arguments=arguments or {"message": "again"},
)
def _publish_skill(
store: SkillSpecStore,
*,
skill_name: str,
body: str,
description: str,
actor: str = "tester",
) -> str:
drafts = DraftService(store)
reviews = ReviewService(store)
publisher = SkillPublisher(store)
draft = drafts.create_new_skill_draft(
skill_name=skill_name,
proposed_content=body,
proposed_frontmatter={"description": description, "tools": ["terminal"]},
created_by=actor,
reason=f"create {skill_name}",
)
reviews.approve(skill_name, draft.draft_id, reviewer=actor, notes="ok")
version = publisher.publish(skill_name, draft.draft_id, publisher=actor, notes="publish")
return version.version
def _receipt(
*,
run_id: str,
session_id: str,
skill_name: str,
skill_version: str,
activated_at: str,
) -> SkillActivationReceipt:
return SkillActivationReceipt(
run_id=run_id,
session_id=session_id,
skill_name=skill_name,
skill_version=skill_version,
content_hash=f"{skill_name}-{skill_version}",
activated_at=activated_at,
activation_reason="selected",
tool_hints=["terminal"],
)
def test_memory_service_snapshot_stays_frozen_until_reload(tmp_path: Path) -> None:
service = MemoryService(tmp_path / "memory")
service.initialize()
initial_snapshot = service.get_snapshot()
assert initial_snapshot.memory_block is None
result = service.get_store().add("memory", "Remember to inspect Docker container logs first.")
assert result["success"] is True
frozen_snapshot = service.get_snapshot()
assert frozen_snapshot.memory_block is None
service.reload_for_new_run()
refreshed_snapshot = service.get_snapshot()
assert "Docker container logs" in (refreshed_snapshot.memory_block or "")
def test_skill_loader_only_uses_active_published_versions(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
active_version = _publish_skill(
store,
skill_name="docker-debug",
body="# Docker Debug\n\nUse `docker logs` before changing config.\n",
description="Debug Docker containers.",
)
_publish_skill(
store,
skill_name="archived-debug",
body="# Archived\n\nOld instructions.\n",
description="Should be hidden from runtime.",
)
SkillPublisher(store).disable("archived-debug", actor="tester", reason="superseded")
loader = SkillsLoader(tmp_path, skill_store=store)
assert loader.get_current_version("docker-debug") == active_version
assert {record.name for record in loader.list_published_skills()} == {"docker-debug"}
assert {item["name"] for item in loader.build_selection_candidates()} == {"docker-debug"}
assert "docker logs" in (loader.load_published_skill("docker-debug") or "").lower()
def test_skill_lifecycle_publish_revision_and_rollback(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
drafts = DraftService(store)
reviews = ReviewService(store)
publisher = SkillPublisher(store)
initial_version = _publish_skill(
store,
skill_name="release-checklist",
body="# Release Checklist\n\nRun tests.\n",
description="Release workflow.",
)
assert initial_version == "v0001"
revision = drafts.create_revision_draft(
skill_name="release-checklist",
base_version=initial_version,
proposed_content="# Release Checklist\n\nRun tests.\nShip artifacts.\n",
proposed_frontmatter={"description": "Release workflow.", "tools": ["terminal"]},
created_by="tester",
reason="add artifact step",
)
reviews.approve("release-checklist", revision.draft_id, reviewer="reviewer", notes="ship it")
published = publisher.publish("release-checklist", revision.draft_id, publisher="reviewer", notes="v2")
assert published.version == "v0002"
assert store.get_current_version("release-checklist") == "v0002"
with pytest.raises(ValueError, match="submitted for review"):
publisher.publish("release-checklist", revision.draft_id, publisher="reviewer", notes="duplicate")
rolled_back = publisher.rollback("release-checklist", "v0001", actor="reviewer", reason="regression")
assert rolled_back.current_version == "v0001"
assert store.get_current_version("release-checklist") == "v0001"
assert set(store.list_versions("release-checklist")) == {"v0001", "v0002"}
def test_skill_lifecycle_retire_proposal_disables_without_new_version(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
drafts = DraftService(store)
reviews = ReviewService(store)
publisher = SkillPublisher(store)
initial_version = _publish_skill(
store,
skill_name="svn-migration",
body="# SVN Migration\n\nUse the legacy checklist only for SVN repositories.\n",
description="Legacy SVN migration workflow.",
)
retire = drafts.create_retire_proposal(
skill_name="svn-migration",
base_version=initial_version,
created_by="tester",
reason="unused legacy workflow",
)
reviews.approve("svn-migration", retire.draft_id, reviewer="reviewer", notes="retire")
with pytest.raises(ValueError, match="Retire proposals"):
publisher.publish("svn-migration", retire.draft_id, publisher="reviewer", notes="wrong path")
assert store.get_current_version("svn-migration") == initial_version
assert store.list_versions("svn-migration") == [initial_version]
spec = publisher.apply_retire_proposal(
"svn-migration",
retire.draft_id,
actor="reviewer",
notes="retired after review",
)
assert spec.status == "disabled"
assert spec.current_version == initial_version
assert store.get_current_version("svn-migration") == initial_version
assert store.list_versions("svn-migration") == [initial_version]
assert store.read_draft("svn-migration", retire.draft_id).status == "disabled" # type: ignore[union-attr]
assert "svn-migration" not in store.list_published_skill_names()
def test_skill_spec_store_lists_new_skill_drafts_before_publish(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
draft = DraftService(store).create_new_skill_draft(
skill_name="brand-new-skill",
proposed_content="# Brand New Skill\n\nDraft body.\n",
proposed_frontmatter={"description": "Draft only."},
created_by="tester",
reason="capture a repeated workflow",
)
drafts = store.list_drafts()
assert [item.draft_id for item in drafts] == [draft.draft_id]
assert drafts[0].skill_name == "brand-new-skill"
def test_skill_learning_service_generates_candidates_and_retire_draft(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
draft_service = DraftService(store)
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=draft_service,
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc)
stale = (now - timedelta(days=45)).isoformat()
recent = now.isoformat()
failing_runs = [
RunRecord(
run_id=f"revise-{index}",
session_id="session-revise",
task_text="Fix the flaky deployment health check",
started_at=recent,
ended_at=recent,
success=False,
finish_reason="error",
feedback={},
activated_skills=[_receipt(
run_id=f"revise-{index}",
session_id="session-revise",
skill_name="deploy-debug",
skill_version="v0002",
activated_at=recent,
)],
)
for index in range(2)
]
for record in failing_runs:
run_store.append_run_record(record)
run_store.append_skill_effect(
SkillEffectRecord(
run_id=record.run_id,
skill_name="deploy-debug",
skill_version="v0002",
success=False,
feedback_score=None,
notes="error",
created_at=recent,
)
)
for index in range(2):
run_store.append_run_record(
RunRecord(
run_id=f"new-{index}",
session_id="session-new",
task_text="Generate a weekly metrics digest for stakeholders",
started_at=recent,
ended_at=recent,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[],
task_id=f"task-new-{index}",
attempt_index=1,
validation_result={"accepted": True, "score": 0.9},
)
)
for index in range(2):
run_store.append_run_record(
RunRecord(
run_id=f"simple-chat-{index}",
session_id="session-simple",
task_text="你是谁",
started_at=recent,
ended_at=recent,
success=True,
finish_reason="stop",
feedback={},
activated_skills=[],
task_id=None,
attempt_index=None,
validation_result=None,
)
)
for index in range(2):
receipts = [
_receipt(
run_id=f"merge-{index}",
session_id="session-merge",
skill_name="docker-debug",
skill_version="v0001",
activated_at=recent,
),
_receipt(
run_id=f"merge-{index}",
session_id="session-merge",
skill_name="k8s-debug",
skill_version="v0003",
activated_at=recent,
),
]
run_store.append_run_record(
RunRecord(
run_id=f"merge-{index}",
session_id="session-merge",
task_text="Investigate staging outage and compare container health checks",
started_at=recent,
ended_at=recent,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=receipts,
task_id=f"task-merge-{index}",
attempt_index=1,
validation_result={"accepted": True, "score": 0.9},
)
)
for receipt in receipts:
run_store.append_skill_effect(
SkillEffectRecord(
run_id=f"merge-{index}",
skill_name=receipt.skill_name,
skill_version=receipt.skill_version,
success=True,
feedback_score=None,
notes="stop",
created_at=recent,
)
)
run_store.append_run_record(
RunRecord(
run_id="retire-1",
session_id="session-retire",
task_text="Legacy SVN migration checklist",
started_at=stale,
ended_at=stale,
success=True,
finish_reason="stop",
feedback={},
activated_skills=[_receipt(
run_id="retire-1",
session_id="session-retire",
skill_name="svn-migration",
skill_version="v0001",
activated_at=stale,
)],
)
)
run_store.append_skill_effect(
SkillEffectRecord(
run_id="retire-1",
skill_name="svn-migration",
skill_version="v0001",
success=True,
feedback_score=None,
notes="stop",
created_at=stale,
)
)
service.rescore_skill_versions()
candidates = service.build_learning_candidates()
kinds = {candidate.kind for candidate in candidates}
assert {"revise_skill", "new_skill", "merge_skills", "retire_skill"} <= kinds
new_candidates = [candidate for candidate in candidates if candidate.kind == "new_skill"]
assert new_candidates
assert all("simple-chat" not in run_id for candidate in new_candidates for run_id in candidate.source_run_ids)
retire_candidate = next(candidate for candidate in candidates if candidate.kind == "retire_skill")
retire_draft = asyncio.run(
service.synthesize_draft(
retire_candidate.candidate_id,
ProviderBundle(main_runtime=None, main_provider=None),
)
)
assert retire_draft.proposal_kind == "retire_skill"
assert retire_draft.status == "draft"
assert store.read_draft("svn-migration", retire_draft.draft_id) is not None
def test_skill_learning_service_generates_task_scoped_candidates(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=DraftService(store),
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc).isoformat()
receipt = _receipt(
run_id="task-run-1",
session_id="session-task",
skill_name="api-review",
skill_version="v0001",
activated_at=now,
)
run_store.append_run_record(
RunRecord(
run_id="task-run-1",
session_id="session-task",
task_id="task-1",
attempt_index=1,
task_text="Review API compatibility",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[receipt],
validation_result={"accepted": True, "score": 0.9},
)
)
run_store.append_run_record(
RunRecord(
run_id="other-task-run",
session_id="session-other",
task_id="task-2",
attempt_index=1,
task_text="Review API compatibility",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[],
validation_result={"accepted": True, "score": 0.9},
)
)
candidates = service.build_learning_candidates_for_task("task-1", trigger_run_id="task-run-1")
assert [candidate.candidate_id for candidate in candidates] == ["revise:api-review:v0001:task:task-1"]
assert candidates[0].source_run_ids == ["task-run-1"]
assert candidates[0].related_skill_names == ["api-review"]
assert candidates[0].evidence["task_id"] == "task-1"
def test_skill_learning_service_generates_new_skill_for_task_without_published_skills(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=DraftService(store),
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc).isoformat()
run_store.append_run_record(
RunRecord(
run_id="task-run-1",
session_id="session-task",
task_id="task-1",
attempt_index=1,
task_text="Generate migration checklist",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied"},
activated_skills=[],
validation_result={"accepted": True, "score": 0.9},
)
)
candidates = service.build_learning_candidates_for_task("task-1", trigger_run_id="task-run-1")
assert [candidate.candidate_id for candidate in candidates] == ["new:task:task-1"]
assert candidates[0].kind == "new_skill"
assert candidates[0].source_run_ids == ["task-run-1"]
def test_skill_learning_service_uses_original_task_text_for_new_skill_theme(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=DraftService(store),
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc).isoformat()
run_store.append_run_record(
RunRecord(
run_id="task-run-1",
session_id="session-task",
task_id="task-1",
attempt_index=1,
task_text="Compare direct production restart with staging rollout",
started_at=now,
ended_at=now,
success=False,
finish_reason="stop",
feedback={"feedback_type": "revise", "comment": "I do not see the docs"},
activated_skills=[],
validation_result=None,
)
)
run_store.append_run_record(
RunRecord(
run_id="task-run-2",
session_id="session-task",
task_id="task-1",
attempt_index=2,
task_text="I do not see the docs",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"feedback_type": "satisfied", "acceptance_type": "accept"},
activated_skills=[],
validation_result={"accepted": True, "score": 0.9},
)
)
candidates = service.build_learning_candidates_for_task("task-1", trigger_run_id="task-run-2")
assert [candidate.candidate_id for candidate in candidates] == ["new:task:task-1"]
assert candidates[0].evidence["theme"] == "Compare direct production restart with staging rollout"
assert candidates[0].evidence["task_text"] == "Compare direct production restart with staging rollout"
def test_skill_learning_service_handles_team_runs_without_attempt_index(tmp_path: Path) -> None:
store = SkillSpecStore(tmp_path)
run_store = RunMemoryStore(tmp_path / "memory" / "runs")
learning_store = SkillLearningStore(tmp_path / "memory" / "skills")
service = SkillLearningService(
run_store=run_store,
learning_store=learning_store,
draft_service=DraftService(store),
evidence_selector=EvidenceSelector(run_store),
)
now = datetime.now(timezone.utc).isoformat()
run_store.append_run_record(
RunRecord(
run_id="team-run",
session_id="session-task:team:research",
task_id="task-1",
attempt_index=None,
task_text="Research one product",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
)
)
run_store.append_run_record(
RunRecord(
run_id="main-run",
session_id="session-task",
task_id="task-1",
attempt_index=1,
task_text="Compare two products and email the report",
started_at=now,
ended_at=now,
success=True,
finish_reason="stop",
feedback={"acceptance_type": "accept"},
)
)
candidates = service.build_learning_candidates_for_task("task-1", final_accepted_run_id="main-run")
assert [candidate.candidate_id for candidate in candidates] == ["new:task:task-1"]
assert candidates[0].evidence["task_text"] == "Compare two products and email the report"
def test_task_theme_uses_first_sentence_for_chinese_text() -> None:
assert (
SkillLearningService._task_theme(
"帮我比较两种发布流程的风险:A 是直接重启线上容器,B 是先部署 staging 再切 production。请给出推荐方案、原因、验证步骤和回滚策略。"
)
== "帮我比较两种发布流程的风险:A 是直接重启线上容器,B 是先部署 staging 再切 production"
)
def test_agent_loop_records_skill_receipts_and_effects(tmp_path: Path) -> None:
skill = SkillContext(
name="docker-debug",
content="Use docker logs before editing config.",
version="v0007",
content_hash="hash-v7",
activation_reason="llm_selected",
tool_hints=["terminal"],
)
loader = EngineLoader(
workspace=tmp_path,
skill_assembler=StubSkillAssembler([skill]),
)
loop = AgentLoop(loader=loader)
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[
LLMResponse(
content="Check the container logs first.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
)
]
),
)
result = asyncio.run(loop.process_direct("Why is the Docker container crashing?", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
activation = next(event for event in events if event.event_type == "skill_activation_snapshotted")
receipts = activation.event_payload["receipts"]
assert receipts == [
{
"run_id": result.run_id,
"session_id": result.session_id,
"skill_name": "docker-debug",
"skill_version": "v0007",
"content_hash": "hash-v7",
"activated_at": receipts[0]["activated_at"],
"activation_reason": "llm_selected",
"tool_hints": ["terminal"],
}
]
skill_effects = next(event for event in events if event.event_type == "skill_effects_snapshotted")
assert skill_effects.event_payload["run_record"]["activated_skills"][0]["skill_version"] == "v0007"
assert skill_effects.event_payload["skill_effects"][0]["skill_name"] == "docker-debug"
assert skill_effects.event_payload["candidate_generation_allowed"] is False
assert skill_effects.event_payload["learning_candidates"] == []
run_records = loaded.run_memory_store.list_runs()
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
assert run_records[-1].run_id == result.run_id
assert effect_records[-1].run_id == result.run_id
def test_thinking_disabled_still_uses_skill_and_tool_assembly(tmp_path: Path) -> None:
skill = SkillContext(
name="docker-debug",
content="Use docker logs before editing config.",
version="v0007",
content_hash="hash-v7",
activation_reason="llm_selected",
tool_hints=["terminal"],
)
skill_assembler = StubSkillAssembler([skill])
tool_assembler = RecordingToolAssembler()
loader = EngineLoader(
workspace=tmp_path,
skill_assembler=skill_assembler,
tool_assembler=tool_assembler,
)
loop = AgentLoop(loader=loader)
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[LLMResponse(content="Done", finish_reason="stop", provider_name="stub", model="stub-model")]
),
)
result = asyncio.run(
loop.process_direct(
"Why is the Docker container crashing?",
provider_bundle=bundle,
thinking_enabled=False,
)
)
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
tool_selection = next(event for event in events if event.event_type == "tool_selection_snapshotted")
assert skill_assembler.calls
assert skill_assembler.calls[0]["thinking_enabled"] is False
assert tool_assembler.calls
assert [skill.name for skill in tool_assembler.calls[0]["activated_skills"]] == ["docker-debug"]
assert tool_selection.event_payload["tool_names"] == ["memory"]
def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path: Path) -> None:
skill = SkillContext(
name="docker-debug",
content="Use docker logs before editing config.",
version="v0007",
content_hash="hash-v7",
activation_reason="llm_selected",
tool_hints=["echo"],
)
loader = EngineLoader(
workspace=tmp_path,
skill_assembler=StubSkillAssembler([skill]),
)
loop = AgentLoop(loader=loader)
provider = StubProvider(
[
LLMResponse(
content="Need a tool.",
finish_reason="tool_calls",
tool_calls=[_tool_call()],
provider_name="stub",
model="stub-model",
),
LLMResponse(
content="Need another tool.",
finish_reason="tool_calls",
tool_calls=[_tool_call(call_id="call-2")],
provider_name="stub",
model="stub-model",
),
LLMResponse(
content="Based on the available tool result, the container likely failed during startup.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
),
]
)
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=provider,
)
result = asyncio.run(
loop.process_direct(
"Why is the Docker container crashing?",
provider_bundle=bundle,
max_tool_iterations=1,
)
)
loaded = loop.boot()
assert result.finish_reason == "max_tool_iterations_finalized"
assert "Based on the available tool result" in result.output_text
assert "Tool loop stopped" not in result.output_text
finalization_messages = provider.calls[-1]["messages"]
assistant_tool_call_ids = [
call["id"]
for message in finalization_messages
for call in message.get("tool_calls", [])
if message.get("role") == "assistant"
]
tool_result_ids = [
message.get("tool_call_id")
for message in finalization_messages
if message.get("role") == "tool"
]
assert "call-1" in assistant_tool_call_ids
assert "call-2" not in assistant_tool_call_ids
assert set(assistant_tool_call_ids).issubset(set(tool_result_ids))
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
assert effect_records[-1].run_id == result.run_id
assert effect_records[-1].success is False
def test_agent_loop_suppresses_raw_tool_call_when_finalizing_after_tool_limit(tmp_path: Path) -> None:
loader = EngineLoader(
workspace=tmp_path,
skill_assembler=StubSkillAssembler([]),
)
loop = AgentLoop(loader=loader)
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[
LLMResponse(
content="Need a tool.",
finish_reason="tool_calls",
tool_calls=[_tool_call()],
provider_name="stub",
model="stub-model",
),
LLMResponse(
content=(
"\n"
"\n"
"https://example.com\n"
"\n"
""
),
finish_reason="stop",
provider_name="stub",
model="stub-model",
),
]
),
)
result = asyncio.run(
loop.process_direct(
"Fetch the latest result",
provider_bundle=bundle,
max_tool_iterations=0,
)
)
assert result.finish_reason == "max_tool_iterations"
assert "" not in result.output_text
assert "raw tool call was suppressed" in result.output_text
def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None:
loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler([])))
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]
),
)
result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted")
assert "message_count" in snapshot.event_payload
assert "tool_names" in snapshot.event_payload
assert "messages" not in snapshot.event_payload
assert "tools" not in snapshot.event_payload