diff --git a/app-instance/backend/beaver/memory/skills/store.py b/app-instance/backend/beaver/memory/skills/store.py index 7caefee..98424b5 100644 --- a/app-instance/backend/beaver/memory/skills/store.py +++ b/app-instance/backend/beaver/memory/skills/store.py @@ -4,7 +4,12 @@ from __future__ import annotations import json from pathlib import Path +import threading from uuid import uuid4 +from contextlib import contextmanager +from typing import Iterator + +from beaver.foundation.utils.file_lock import WorkspaceWriteLock from .models import ( SkillDraftEvalReport, @@ -16,9 +21,11 @@ from .models import ( class SkillLearningStore: - def __init__(self, root: str | Path) -> None: + def __init__(self, root: str | Path, *, write_lock: WorkspaceWriteLock | None = None) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) + self.write_lock = write_lock + self._local_lock = threading.RLock() self.performance_path = self.root / "performance.jsonl" self.candidates_path = self.root / "learning-candidates.jsonl" self.audit_path = self.root / "learning-audit.jsonl" @@ -26,42 +33,58 @@ class SkillLearningStore: self.eval_reports_dir = self.root / "eval-reports" def record_learning_candidate(self, candidate: SkillLearningCandidate) -> None: + self.record_learning_candidate_if_absent(candidate) + + def record_learning_candidate_if_absent( + self, + candidate: SkillLearningCandidate, + ) -> tuple[SkillLearningCandidate, bool]: normalized = SkillLearningCandidate.from_dict(candidate.to_dict()) - self._append_jsonl(self.candidates_path, normalized.to_dict()) - self.append_audit_event( - normalized.candidate_id, - "candidate_created", - { - "kind": normalized.kind, - "status": normalized.status, - "reason": normalized.reason, - }, - ) + with self._locked(): + existing = { + item.candidate_id: item + for item in self.list_learning_candidates() + } + found = existing.get(normalized.candidate_id) + if found is not None: + return found, False + self._append_jsonl(self.candidates_path, normalized.to_dict()) + self.append_audit_event( + normalized.candidate_id, + "candidate_created", + { + "kind": normalized.kind, + "status": normalized.status, + "reason": normalized.reason, + }, + ) + return normalized, True def update_learning_candidate(self, candidate_id: str, **updates: object) -> SkillLearningCandidate | None: - candidates = self.list_learning_candidates() - updated: SkillLearningCandidate | None = None - for index, candidate in enumerate(candidates): - if candidate.candidate_id != candidate_id: - continue - payload = candidate.to_dict() - payload.update(updates) - if "updated_at" not in updates: - payload["updated_at"] = _utc_now() - updated = SkillLearningCandidate.from_dict(payload) - candidates[index] = updated - break - if updated is None: - return None - self.candidates_path.parent.mkdir(parents=True, exist_ok=True) - self.candidates_path.write_text( - "".join( - json.dumps(candidate.to_dict(), ensure_ascii=False, sort_keys=True) + "\n" - for candidate in candidates - ), - encoding="utf-8", - ) - return updated + with self._locked(): + candidates = self.list_learning_candidates() + updated: SkillLearningCandidate | None = None + for index, candidate in enumerate(candidates): + if candidate.candidate_id != candidate_id: + continue + payload = candidate.to_dict() + payload.update(updates) + if "updated_at" not in updates: + payload["updated_at"] = _utc_now() + updated = SkillLearningCandidate.from_dict(payload) + candidates[index] = updated + break + if updated is None: + return None + self.candidates_path.parent.mkdir(parents=True, exist_ok=True) + self.candidates_path.write_text( + "".join( + json.dumps(candidate.to_dict(), ensure_ascii=False, sort_keys=True) + "\n" + for candidate in candidates + ), + encoding="utf-8", + ) + return updated def transition_learning_candidate( self, @@ -209,6 +232,15 @@ class SkillLearningStore: raise ValueError(f"Expected JSON object in {path}") return payload + @contextmanager + def _locked(self) -> Iterator[None]: + if self.write_lock is not None: + with self.write_lock.acquire(timeout_seconds=10): + yield + return + with self._local_lock: + yield + def _utc_now() -> str: from datetime import datetime, timezone diff --git a/app-instance/backend/beaver/plugins/skills.py b/app-instance/backend/beaver/plugins/skills.py index d24bd63..20de778 100644 --- a/app-instance/backend/beaver/plugins/skills.py +++ b/app-instance/backend/beaver/plugins/skills.py @@ -93,6 +93,22 @@ class PluginManager: finally: transaction.cleanup() + def sync_enabled(self, *, blocking: bool = True) -> dict[str, PluginState]: + results: dict[str, PluginState] = {} + with self.write_lock.acquire(timeout_seconds=10, blocking=blocking): + for state in self.state_store.list_plugins(): + manifest = self.manifests.get(state.plugin_id) + if not state.enabled or state.updates_paused: + results[state.plugin_id] = state + continue + if manifest is None: + state.status = "missing" + self.state_store.upsert_plugin(state) + results[state.plugin_id] = state + continue + results[state.plugin_id] = self._sync_plugin(state, manifest) + return results + def _prepare_initial_mirror( self, manifest: PluginManifest, @@ -158,6 +174,108 @@ class PluginManager: ) return prepared + def _sync_plugin(self, state: PluginState, manifest: PluginManifest) -> PluginState: + transaction = PluginSkillTransaction(self.workspace) + try: + for declaration in manifest.skills: + binding = state.skills.get(declaration.name) + if binding is None or not binding.accepted_upstream_tree_hash: + continue + snapshot = self.skill_store.stage_upstream_snapshot( + transaction, + skill_name=declaration.name, + source_kind="plugin", + source_id=manifest.plugin_id, + source_version=manifest.version, + source_path=declaration.relative_path, + source_root=declaration.root, + ) + self.skill_store.promote_upstream_snapshot(transaction, snapshot) + current = self.skill_store.read_published_skill(declaration.name) + if current is None: + continue + classification = classify_plugin_skill_update( + binding.accepted_upstream_tree_hash, + current.version.tree_hash, + snapshot.skill_tree_hash, + ) + binding.observed_upstream_tree_hash = snapshot.skill_tree_hash + binding.current_beaver_version = current.version.version + if classification == "unchanged": + binding.status = "synced" + continue + if classification == "already_applied": + binding.accepted_upstream_tree_hash = snapshot.skill_tree_hash + binding.accepted_beaver_version = current.version.version + binding.pending_candidate_id = None + binding.status = "synced" + continue + candidate = self._create_update_candidate( + plugin_id=manifest.plugin_id, + plugin_version=manifest.version, + skill_name=declaration.name, + merge_mode=classification, + base_upstream_tree_hash=binding.accepted_upstream_tree_hash, + new_upstream_tree_hash=snapshot.skill_tree_hash, + local_version=current.version.version, + ) + if binding.pending_candidate_id and binding.pending_candidate_id != candidate.candidate_id: + self.learning_store.transition_learning_candidate( + binding.pending_candidate_id, + "superseded", + event_type="plugin_update_superseded", + payload={"replacement_candidate_id": candidate.candidate_id}, + ) + recorded, _created = self.learning_store.record_learning_candidate_if_absent(candidate) + binding.pending_candidate_id = recorded.candidate_id + binding.status = "update_pending" + state.installed_version = manifest.version + state.manifest_path = manifest.display_path + if any(binding.status == "update_pending" for binding in state.skills.values()): + state.status = "update_pending" + else: + state.status = "synced" + self.state_store.upsert_plugin(state) + return state + finally: + transaction.cleanup() + + @staticmethod + def _create_update_candidate( + *, + plugin_id: str, + plugin_version: str, + skill_name: str, + merge_mode: str, + base_upstream_tree_hash: str, + new_upstream_tree_hash: str, + local_version: str, + ): + from beaver.memory.skills.models import SkillLearningCandidate + + candidate_id = f"plugin-update:{plugin_id}:{skill_name}:{new_upstream_tree_hash[:12]}" + return SkillLearningCandidate( + candidate_id=candidate_id, + kind="plugin_skill_update", + source_run_ids=[], + source_session_ids=[], + related_skill_names=[skill_name], + reason=f"Plugin {plugin_id} has an update for skill {skill_name}.", + evidence={ + "plugin_id": plugin_id, + "plugin_version": plugin_version, + "skill_name": skill_name, + "merge_mode": merge_mode, + "base_upstream_tree_hash": base_upstream_tree_hash, + "new_upstream_tree_hash": new_upstream_tree_hash, + "local_version": local_version, + }, + status="open", + priority=10, + confidence=1.0, + trigger_reason="plugin_update", + ) + def _publish_initial_mirror(self, item: dict[str, Any]) -> None: skill_name = str(item["skill_name"]) version: SkillVersion = item["version"] @@ -261,3 +379,13 @@ def _utc_now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat() + + +def classify_plugin_skill_update(base_tree: str, local_tree: str, upstream_tree: str) -> str: + if upstream_tree == base_tree: + return "unchanged" + if local_tree == upstream_tree: + return "already_applied" + if local_tree == base_tree: + return "fast_forward" + return "three_way" diff --git a/app-instance/backend/beaver/skills/specs/storage.py b/app-instance/backend/beaver/skills/specs/storage.py index 2539b1c..766de4c 100644 --- a/app-instance/backend/beaver/skills/specs/storage.py +++ b/app-instance/backend/beaver/skills/specs/storage.py @@ -174,8 +174,7 @@ class SkillSpecStore: version_dir = self._skill_dir(version.skill_name) / "versions" / version.version version_dir.mkdir(parents=True, exist_ok=True) self._write_text(version_dir / "SKILL.md", content) - if not version.tree_hash: - version.tree_hash = hash_plugin_skill_tree(version_dir).skill_tree_hash + version.tree_hash = hash_plugin_skill_tree(version_dir).skill_tree_hash self._write_json(version_dir / "version.json", version.to_dict()) def stage_upstream_snapshot( diff --git a/app-instance/backend/tests/unit/test_plugin_skill_sync.py b/app-instance/backend/tests/unit/test_plugin_skill_sync.py index 6fb6207..5af6349 100644 --- a/app-instance/backend/tests/unit/test_plugin_skill_sync.py +++ b/app-instance/backend/tests/unit/test_plugin_skill_sync.py @@ -8,7 +8,7 @@ import pytest from beaver.foundation.utils.file_lock import WorkspaceWriteLock from beaver.memory.skills import SkillLearningStore from beaver.plugins.discovery import discover_plugins -from beaver.plugins.skills import PluginManager +from beaver.plugins.skills import PluginManager, classify_plugin_skill_update from beaver.plugins.state import PluginStateStore from beaver.skills.catalog.loader import SkillsLoader from beaver.skills.learning.safety import SkillDraftSafetyChecker @@ -55,6 +55,24 @@ def _write_skill_plugin( return plugin_root +def _rewrite_plugin_version(plugin_root: Path, *, version: str, skill_text: str | None = None, template: str | None = None) -> None: + manifest_path = plugin_root / "beaver.plugin.json" + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + manifest["version"] = version + manifest_path.write_text(json.dumps(manifest), encoding="utf-8") + skill_name = manifest["skills"][0]["name"] + skill_root = plugin_root / "skills" / skill_name + if skill_text is not None: + (skill_root / "SKILL.md").write_text( + "---\nname: {0}\ndescription: Comic workflow\ntools: []\n---\n\n{1}".format(skill_name, skill_text), + encoding="utf-8", + ) + if template is not None: + target = skill_root / "templates" / "panel.txt" + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(template, encoding="utf-8") + + def _manager(workspace: Path) -> PluginManager: discovery = discover_plugins(workspace, search_paths=[]) skill_store = SkillSpecStore(workspace) @@ -143,3 +161,76 @@ def test_enable_plugin_is_idempotent(tmp_path: Path) -> None: assert first.status == "synced" assert second.status == "synced" assert SkillSpecStore(workspace).list_versions("baoyu-comic") == ["v0001"] + + +@pytest.mark.parametrize( + ("base", "local", "upstream", "expected"), + [ + ("A", "A", "A", "unchanged"), + ("A", "B", "B", "already_applied"), + ("A", "A", "B", "fast_forward"), + ("A", "LOCAL", "UPSTREAM", "three_way"), + ], +) +def test_classify_plugin_skill_update(base: str, local: str, upstream: str, expected: str) -> None: + assert classify_plugin_skill_update(base, local, upstream) == expected + + +def test_sync_enabled_creates_idempotent_fast_forward_candidate_for_supporting_file_update(tmp_path: Path) -> None: + workspace = tmp_path / "workspace" + plugin_root = _write_skill_plugin(workspace / "plugins", extra_files={"templates/panel.txt": "v1"}) + manager = _manager(workspace) + manager.enable("baoyu-comic") + _rewrite_plugin_version(plugin_root, version="1.1.0", template="v2") + + first = _manager(workspace).sync_enabled() + second = _manager(workspace).sync_enabled() + candidates = SkillLearningStore(workspace / "memory" / "skills").list_learning_candidates() + + assert first["baoyu-comic"].skills["baoyu-comic"].status == "update_pending" + assert second["baoyu-comic"].skills["baoyu-comic"].status == "update_pending" + assert len(candidates) == 1 + candidate = candidates[0] + assert candidate.kind == "plugin_skill_update" + assert candidate.candidate_id.startswith("plugin-update:baoyu-comic:baoyu-comic:") + assert candidate.evidence["merge_mode"] == "fast_forward" + assert "Draw panels" not in json.dumps(candidate.evidence) + + +def test_sync_enabled_creates_three_way_candidate_when_local_diverged(tmp_path: Path) -> None: + workspace = tmp_path / "workspace" + plugin_root = _write_skill_plugin(workspace / "plugins") + manager = _manager(workspace) + manager.enable("baoyu-comic") + store = SkillSpecStore(workspace) + loaded = store.read_published_skill("baoyu-comic") + assert loaded is not None + local_version = loaded.version + local_version.version = "v0002" + local_version.parent_version = "v0001" + store.write_skill_version(local_version, loaded.content + "\nLocal learning.\n") + store.set_current_version("baoyu-comic", "v0002") + _rewrite_plugin_version(plugin_root, version="1.1.0", skill_text="# Baoyu Comic\n\nUpstream change.\n") + + _manager(workspace).sync_enabled() + candidate = SkillLearningStore(workspace / "memory" / "skills").list_learning_candidates()[0] + + assert candidate.evidence["merge_mode"] == "three_way" + assert candidate.evidence["local_version"] == "v0002" + + +def test_sync_enabled_supersedes_stale_pending_update(tmp_path: Path) -> None: + workspace = tmp_path / "workspace" + plugin_root = _write_skill_plugin(workspace / "plugins") + _manager(workspace).enable("baoyu-comic") + _rewrite_plugin_version(plugin_root, version="1.1.0", skill_text="# Baoyu Comic\n\nFirst update.\n") + _manager(workspace).sync_enabled() + first_candidate = SkillLearningStore(workspace / "memory" / "skills").list_learning_candidates()[0] + + _rewrite_plugin_version(plugin_root, version="1.2.0", skill_text="# Baoyu Comic\n\nSecond update.\n") + _manager(workspace).sync_enabled() + candidates = SkillLearningStore(workspace / "memory" / "skills").list_learning_candidates() + + assert len(candidates) == 2 + assert {candidate.status for candidate in candidates} == {"open", "superseded"} + assert any(candidate.candidate_id != first_candidate.candidate_id for candidate in candidates) diff --git a/app-instance/backend/tests/unit/test_skill_learning_candidate_state.py b/app-instance/backend/tests/unit/test_skill_learning_candidate_state.py index 75888ad..1ea5fb2 100644 --- a/app-instance/backend/tests/unit/test_skill_learning_candidate_state.py +++ b/app-instance/backend/tests/unit/test_skill_learning_candidate_state.py @@ -76,6 +76,35 @@ def test_legacy_candidate_payload_is_backward_compatible(tmp_path: Path) -> None assert candidate.updated_at +def test_record_learning_candidate_if_absent_is_idempotent(tmp_path: Path) -> None: + store = SkillLearningStore(tmp_path) + candidate = SkillLearningCandidate( + candidate_id="plugin-update:baoyu-comic:baoyu-comic:abcdef123456", + kind="plugin_skill_update", + source_run_ids=[], + source_session_ids=[], + related_skill_names=["baoyu-comic"], + reason="Plugin update", + evidence={ + "plugin_id": "baoyu-comic", + "plugin_version": "1.1.0", + "skill_name": "baoyu-comic", + "merge_mode": "fast_forward", + "base_upstream_tree_hash": "old", + "new_upstream_tree_hash": "new", + "local_version": "v0001", + }, + ) + + first, first_created = store.record_learning_candidate_if_absent(candidate) + second, second_created = store.record_learning_candidate_if_absent(candidate) + + assert first_created is True + assert second_created is False + assert first.candidate_id == second.candidate_id + assert len(store.list_learning_candidates()) == 1 + + def test_safety_and_eval_reports_round_trip(tmp_path: Path) -> None: store = SkillLearningStore(tmp_path) safety = SkillDraftSafetyReport(