"""Background worker for assisted skill learning.""" from __future__ import annotations import asyncio import os from dataclasses import dataclass, field from typing import Callable from beaver.engine.providers import ProviderBundle from beaver.memory.skills import SkillLearningCandidate from beaver.skills.learning.pipeline import SkillLearningPipelineService @dataclass(slots=True) class SkillLearningWorkerConfig: enabled: bool = True max_drafts_per_run: int = 5 max_retries: int = 3 interval_seconds: float = 300.0 @classmethod def from_env(cls) -> "SkillLearningWorkerConfig": return cls( enabled=_env_bool("BEAVER_SKILL_LEARNING_WORKER_ENABLED", True), max_drafts_per_run=_env_int("BEAVER_SKILL_LEARNING_MAX_DRAFTS_PER_RUN", 5), max_retries=_env_int("BEAVER_SKILL_LEARNING_MAX_RETRIES", 3), interval_seconds=float(os.getenv("BEAVER_SKILL_LEARNING_INTERVAL_SECONDS", "300") or "300"), ) @dataclass(slots=True) class SkillLearningWorkerResult: processed: int = 0 succeeded: int = 0 failed: int = 0 skipped: int = 0 failures: list[dict[str, str]] = field(default_factory=list) def to_dict(self) -> dict: return { "processed": self.processed, "succeeded": self.succeeded, "failed": self.failed, "skipped": self.skipped, "failures": [dict(item) for item in self.failures], } class SkillLearningWorker: """Synthesizes drafts for open candidates; never approves or publishes.""" _ACTIVE_DRAFT_STATUSES = {"queued", "synthesizing", "draft_ready", "review_pending", "approved"} def __init__( self, *, pipeline: SkillLearningPipelineService, provider_bundle_factory: Callable[[], ProviderBundle], config: SkillLearningWorkerConfig | None = None, ) -> None: self.pipeline = pipeline self.provider_bundle_factory = provider_bundle_factory self.config = config or SkillLearningWorkerConfig.from_env() self._running = False self._lock = asyncio.Lock() async def run_forever(self) -> None: if not self.config.enabled: return self._running = True try: while self._running: await self.run_once() await asyncio.sleep(self.config.interval_seconds) finally: self._running = False def stop(self) -> None: self._running = False async def run_once(self) -> SkillLearningWorkerResult: if not self.config.enabled: return SkillLearningWorkerResult() async with self._lock: result = SkillLearningWorkerResult() candidates = self._select_candidates() for candidate in candidates[: self.config.max_drafts_per_run]: result.processed += 1 try: handled = await self._process_candidate(candidate) if handled: result.succeeded += 1 else: result.skipped += 1 except Exception as exc: result.failed += 1 result.failures.append({"candidate_id": candidate.candidate_id, "error": str(exc)}) self._mark_failure(candidate, str(exc)) return result def _select_candidates(self) -> list[SkillLearningCandidate]: candidates = [ item for item in self.pipeline.list_candidates() if item.status == "open" and item.retry_count < self.config.max_retries ] return sorted(candidates, key=lambda item: (item.priority, item.confidence, item.created_at), reverse=True) async def _process_candidate(self, candidate: SkillLearningCandidate) -> bool: if self._has_active_draft(candidate): self.pipeline.mark_candidate_superseded(candidate.candidate_id, "active draft already exists for this skill") return False self.pipeline.mark_candidate_queued(candidate.candidate_id) self.pipeline.mark_candidate_synthesizing(candidate.candidate_id) draft = await self.pipeline.synthesize_draft( candidate.candidate_id, provider_bundle=self.provider_bundle_factory(), ) self.pipeline.mark_draft_synthesized(candidate.candidate_id, draft) safety = self.pipeline.check_safety(draft.skill_name, draft.draft_id) if not safety.passed or safety.risk_level == "critical": return True await self.pipeline.evaluate_draft( candidate.candidate_id, draft.skill_name, draft.draft_id, provider_bundle=self.provider_bundle_factory(), ) return True def _has_active_draft(self, candidate: SkillLearningCandidate) -> bool: target_names = set(candidate.related_skill_names) if candidate.draft_skill_name: target_names.add(candidate.draft_skill_name) if not target_names: return False for item in self.pipeline.list_candidates(): if item.candidate_id == candidate.candidate_id: continue if item.status not in self._ACTIVE_DRAFT_STATUSES: continue item_names = set(item.related_skill_names) if item.draft_skill_name: item_names.add(item.draft_skill_name) if target_names.intersection(item_names): return True return False def _mark_failure(self, candidate: SkillLearningCandidate, error: str) -> None: retry_count = candidate.retry_count + 1 status = "failed" if retry_count >= self.config.max_retries else "open" self.pipeline.mark_candidate_failed( candidate.candidate_id, error, retry_count=retry_count, terminal=(status == "failed"), ) def _env_bool(name: str, default: bool) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() not in {"0", "false", "no", "off"} def _env_int(name: str, default: int) -> int: raw = os.getenv(name) if raw in (None, ""): return default try: return int(raw) except ValueError: return default