"""Lightweight replay/eval reports for skill drafts.""" from __future__ import annotations from uuid import uuid4 from beaver.engine.context import SkillContext from beaver.engine.providers import ProviderBundle from beaver.memory.runs import RunMemoryStore from beaver.memory.skills import SkillDraftEvalReport, SkillLearningCandidate from beaver.skills.learning.case_selection import select_replay_cases from beaver.skills.learning.preservation import check_preservation from beaver.skills.learning.replay import ReplayArmRequest, ReplayRunner from beaver.skills.learning.surrogate import SurrogateToolEvaluator from beaver.skills.specs import SkillDraft class SkillDraftEvaluator: """Builds a bounded eval report without writing user-visible sessions.""" def __init__( self, run_store: RunMemoryStore, *, surrogate_evaluator: SurrogateToolEvaluator | None = None, ) -> None: self.run_store = run_store self.surrogate_evaluator = surrogate_evaluator or SurrogateToolEvaluator() async def evaluate( self, *, candidate: SkillLearningCandidate, draft: SkillDraft, provider_bundle: ProviderBundle | None, replay_runner: ReplayRunner | None = None, ) -> SkillDraftEvalReport: if provider_bundle is None or provider_bundle.main_provider is None: return self._skipped(candidate, draft) runs = self.run_store.list_runs() replay_cases = select_replay_cases(candidate, runs) if replay_runner is not None and replay_cases: return await self._evaluate_replay( candidate=candidate, draft=draft, replay_cases=replay_cases, provider_bundle=provider_bundle, replay_runner=replay_runner, ) return self._evaluate_heuristic(candidate, draft, runs) def _evaluate_heuristic( self, candidate: SkillLearningCandidate, draft: SkillDraft, runs: list, ) -> SkillDraftEvalReport: runs_by_id = {record.run_id: record for record in runs} cases: list[dict] = [] for run_id in candidate.source_run_ids[:8]: record = runs_by_id.get(run_id) if record is None: continue baseline = _score_from_validation(record.validation_result, record.success) candidate_score = _candidate_score(baseline, draft) cases.append( { "run_id": run_id, "session_id": record.session_id, "baseline_score": baseline, "candidate_score": candidate_score, "delta": round(candidate_score - baseline, 4), } ) if not cases: cases.append( { "run_id": "", "session_id": "", "baseline_score": 0.75, "candidate_score": _candidate_score(0.75, draft), "delta": round(_candidate_score(0.75, draft) - 0.75, 4), } ) baseline_avg = sum(item["baseline_score"] for item in cases) / len(cases) candidate_avg = sum(item["candidate_score"] for item in cases) / len(cases) regressions = [item for item in cases if item["candidate_score"] < item["baseline_score"]] improved = [item for item in cases if item["candidate_score"] > item["baseline_score"]] unchanged = len(cases) - len(regressions) - len(improved) score_delta = candidate_avg - baseline_avg passed = not (len(regressions) > 0 and score_delta <= 0) and candidate_avg >= 0.75 return SkillDraftEvalReport( report_id=uuid4().hex, skill_name=draft.skill_name, draft_id=draft.draft_id, candidate_id=candidate.candidate_id, passed=passed, baseline_score_avg=round(baseline_avg, 4), candidate_score_avg=round(candidate_avg, 4), score_delta=round(score_delta, 4), regression_count=len(regressions), improved_count=len(improved), unchanged_count=unchanged, cases=cases, status="completed", created_at=_utc_now(), ) async def _evaluate_replay( self, *, candidate: SkillLearningCandidate, draft: SkillDraft, replay_cases: list[dict], provider_bundle: ProviderBundle, replay_runner: ReplayRunner, ) -> SkillDraftEvalReport: case_reports: list[dict] = [] legacy_cases: list[dict] = [] for case in replay_cases: baseline = await replay_runner.run_arm( ReplayArmRequest( case_id=f"{case['run_id']}:baseline", arm="baseline", task_text=str(case["task_text"]), pinned_skill_names=list(case.get("baseline_skill_names") or []), pinned_skill_contexts=[], provider_bundle=provider_bundle, model_settings={"max_tool_iterations": 4, "temperature": 0.0}, ) ) candidate_arm = await replay_runner.run_arm( ReplayArmRequest( case_id=f"{case['run_id']}:candidate", arm="candidate", task_text=str(case["task_text"]), pinned_skill_names=[], pinned_skill_contexts=[_draft_skill_context(draft)], provider_bundle=provider_bundle, model_settings={"max_tool_iterations": 4, "temperature": 0.0}, ) ) surrogate = await self.surrogate_evaluator.evaluate( task_text=str(case["task_text"]), baseline=baseline, candidate=candidate_arm, ) baseline_score = surrogate["baseline_score"] candidate_score = surrogate["candidate_score"] case_report = { "run_id": case["run_id"], "task_id": case.get("task_id"), "session_id": case.get("session_id"), "baseline": baseline, "candidate": candidate_arm, "baseline_score": baseline_score, "candidate_score": candidate_score, "delta": round(candidate_score - baseline_score, 4), "execution_coverage": _arm_mode_coverage(baseline, candidate_arm, "executed"), "surrogate_coverage": _arm_mode_coverage(baseline, candidate_arm, "surrogate"), "blocked_tool_count": _arm_mode_count(baseline, candidate_arm, "blocked"), "confidence": surrogate["confidence"], "tool_calls": [*baseline.get("tool_calls", []), *candidate_arm.get("tool_calls", [])], "artifacts": [*baseline.get("artifacts", []), *candidate_arm.get("artifacts", [])], "side_effects": [*baseline.get("side_effects", []), *candidate_arm.get("side_effects", [])], "validator_notes": list(surrogate.get("notes") or []), } case_reports.append(case_report) legacy_cases.append( { "run_id": case["run_id"], "session_id": case.get("session_id") or "", "baseline_score": baseline_score, "candidate_score": candidate_score, "delta": round(candidate_score - baseline_score, 4), } ) preservation_report = _preservation_report(candidate, draft) return _report_from_case_reports(candidate, draft, case_reports, legacy_cases, preservation_report) def _skipped(self, candidate: SkillLearningCandidate, draft: SkillDraft) -> SkillDraftEvalReport: return SkillDraftEvalReport( report_id=uuid4().hex, skill_name=draft.skill_name, draft_id=draft.draft_id, candidate_id=candidate.candidate_id, passed=True, baseline_score_avg=0.0, candidate_score_avg=0.0, score_delta=0.0, regression_count=0, improved_count=0, unchanged_count=0, cases=[], status="skipped_provider_unavailable", created_at=_utc_now(), ) def _score_from_validation(validation: dict | None, success: bool) -> float: if isinstance(validation, dict) and "score" in validation: try: return max(0.0, min(1.0, float(validation.get("score") or 0.0))) except (TypeError, ValueError): pass return 0.8 if success else 0.4 def _candidate_score(baseline: float, draft: SkillDraft) -> float: content = draft.proposed_content.strip() if not content and draft.proposal_kind != "retire_skill": return 0.0 if "regression" in content.lower(): return max(0.0, baseline - 0.2) return min(1.0, max(0.75, baseline + 0.05)) def _draft_skill_context(draft: SkillDraft) -> SkillContext: tool_hints = draft.proposed_frontmatter.get("tools") return SkillContext( name=f"draft:{draft.skill_name}", content=draft.proposed_content, version=draft.draft_id, content_hash="draft", activation_reason="skill_replay_eval_candidate", tool_hints=[str(item) for item in tool_hints if str(item).strip()] if isinstance(tool_hints, list) else [], ) def _preservation_report(candidate: SkillLearningCandidate, draft: SkillDraft) -> dict | None: if candidate.kind not in {"revise_skill", "merge_skills"}: return None base_content = str(candidate.evidence.get("base_content") or "") if isinstance(candidate.evidence, dict) else "" if not base_content.strip(): return None return check_preservation(base_content=base_content, draft_content=draft.proposed_content) def _report_from_case_reports( candidate: SkillLearningCandidate, draft: SkillDraft, case_reports: list[dict], legacy_cases: list[dict], preservation_report: dict | None, ) -> SkillDraftEvalReport: baseline_avg = sum(item["baseline_score"] for item in legacy_cases) / len(legacy_cases) candidate_avg = sum(item["candidate_score"] for item in legacy_cases) / len(legacy_cases) regressions = [item for item in legacy_cases if item["candidate_score"] < item["baseline_score"]] improved = [item for item in legacy_cases if item["candidate_score"] > item["baseline_score"]] unchanged = len(legacy_cases) - len(regressions) - len(improved) execution, surrogate, blocked = _coverage(case_reports) confidence = _confidence(execution, surrogate, blocked, [item.get("confidence") for item in case_reports]) score_delta = candidate_avg - baseline_avg passed = candidate_avg >= 0.75 and not (regressions and score_delta <= 0) and blocked < 1.0 return SkillDraftEvalReport( report_id=uuid4().hex, skill_name=draft.skill_name, draft_id=draft.draft_id, candidate_id=candidate.candidate_id, passed=passed, baseline_score_avg=round(baseline_avg, 4), candidate_score_avg=round(candidate_avg, 4), score_delta=round(score_delta, 4), regression_count=len(regressions), improved_count=len(improved), unchanged_count=unchanged, cases=legacy_cases, status="completed", created_at=_utc_now(), eval_version="replay-v1", mode="replay", execution_coverage=execution, surrogate_coverage=surrogate, blocked_coverage=blocked, confidence=confidence, case_reports=case_reports, tool_mode_summary={"executed": execution, "surrogate": surrogate, "blocked": blocked}, preservation_report=preservation_report, ) def _coverage(case_reports: list[dict]) -> tuple[float, float, float]: counts = {"executed": 0, "surrogate": 0, "blocked": 0} for report in case_reports: for call in report.get("tool_calls") or []: if isinstance(call, dict) and call.get("mode") in counts: counts[str(call["mode"])] += 1 total = sum(counts.values()) if total == 0: return 1.0, 0.0, 0.0 return ( round(counts["executed"] / total, 4), round(counts["surrogate"] / total, 4), round(counts["blocked"] / total, 4), ) def _confidence(execution: float, surrogate: float, blocked: float, case_confidences: list[object]) -> str: if blocked > 0.0: return "low" if execution >= 0.75 and surrogate <= 0.25: return "high" if execution >= 0.25 or "medium" in case_confidences: return "medium" return "low" def _arm_mode_coverage(baseline: dict, candidate: dict, mode: str) -> float: calls = [*baseline.get("tool_calls", []), *candidate.get("tool_calls", [])] if not calls: return 1.0 if mode == "executed" else 0.0 return round(sum(1 for call in calls if isinstance(call, dict) and call.get("mode") == mode) / len(calls), 4) def _arm_mode_count(baseline: dict, candidate: dict, mode: str) -> int: calls = [*baseline.get("tool_calls", []), *candidate.get("tool_calls", [])] return sum(1 for call in calls if isinstance(call, dict) and call.get("mode") == mode) def _utc_now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat()