"""Built-in cron tool for managing scheduled Beaver Tasks.""" from __future__ import annotations import json from typing import Any from beaver.services.cron_service import CronService, schedule_from_api from beaver.tools.base import BaseTool, ToolContext, ToolResult, ToolSpec CRON_TOOL_DESCRIPTION = ( "Create and manage scheduled Beaver notifications or Tasks. Notification mode " "sends scheduled results to the fixed notification session; task mode creates " "a Task run. Actions: add, list, remove, toggle, run." ) CRON_TOOL_PARAMETERS: dict[str, Any] = { "type": "object", "properties": { "action": { "type": "string", "enum": ["add", "list", "remove", "toggle", "run"], "description": "The scheduled-task operation to perform.", }, "name": { "type": "string", "description": "Short scheduled-task name. Optional for add.", }, "message": { "type": "string", "description": "The task instruction to run when the schedule triggers. Required for add.", }, "schedule": { "type": "string", "description": "Schedule expression, for example 'every 15m', '0 9 * * *', or an ISO datetime.", }, "every_seconds": { "type": "integer", "minimum": 1, "description": "Fixed interval in seconds for recurring scheduled tasks.", }, "cron_expr": { "type": "string", "description": "Cron expression such as '0 9 * * *'.", }, "tz": { "type": "string", "description": "IANA timezone for cron_expr, for example 'Asia/Shanghai'.", }, "at_iso": { "type": "string", "description": "ISO datetime for one-time scheduled tasks.", }, "job_id": { "type": "string", "description": "Scheduled-task ID for remove, toggle, or run.", }, "enabled": { "type": "boolean", "description": "Whether the scheduled task should be enabled when action is toggle.", }, "mode": { "type": "string", "enum": ["notification", "task"], "description": "Use notification for reminders/reports; use task only when the scheduled work requires Task tracking.", }, "requires_followup": { "type": "boolean", "description": "Whether a task-mode scheduled run should appear as an active task awaiting user follow-up.", }, }, "required": ["action"], } class CronTool(BaseTool): """Tool-facing wrapper around the process CronService.""" @property def spec(self) -> ToolSpec: return ToolSpec( name="cron", description=CRON_TOOL_DESCRIPTION, input_schema=CRON_TOOL_PARAMETERS, toolset="cron", always_available=False, ) async def invoke(self, arguments: dict[str, Any], context: ToolContext) -> ToolResult: try: result = await self._invoke(arguments, context) return ToolResult( success=bool(result.get("success", True)), content=json.dumps(result, ensure_ascii=False), tool_name=self.spec.name, error=str(result.get("error")) if result.get("error") else None, raw_output=result, ) except Exception as exc: return ToolResult( success=False, content=json.dumps({"success": False, "error": str(exc)}, ensure_ascii=False), tool_name=self.spec.name, error=str(exc), ) async def _invoke(self, arguments: dict[str, Any], context: ToolContext) -> dict[str, Any]: service = self._resolve_cron_service(context) action = str(arguments.get("action") or "").strip().lower() if action == "add": schedule = schedule_from_api(arguments) job = service.add_job( name=str(arguments.get("name") or "").strip(), message=str(arguments.get("message") or "").strip(), schedule=schedule, session_key=str(arguments.get("session_key") or context.session_id or "").strip() or None, payload_kind="agent_turn", mode=str(arguments.get("mode") or "notification").strip().lower(), requires_followup=bool(arguments.get("requires_followup", False)), ) return {"success": True, "job": job.to_api_dict()} if action == "list": include_disabled = bool(arguments.get("include_disabled", True)) return { "success": True, "jobs": [job.to_api_dict() for job in service.list_jobs(include_disabled=include_disabled)], } if action == "remove": job_id = _required_job_id(arguments) return {"success": service.remove_job(job_id), "job_id": job_id} if action == "toggle": job_id = _required_job_id(arguments) job = service.update_enabled(job_id, bool(arguments.get("enabled", True))) if job is None: return {"success": False, "error": f"Scheduled task {job_id!r} was not found."} return {"success": True, "job": job.to_api_dict()} if action == "run": job_id = _required_job_id(arguments) ok = await service.run_job(job_id, force=True) job = service.get_job(job_id) return { "success": ok, "job_id": job_id, "job": job.to_api_dict() if job is not None else None, } return {"success": False, "error": "action must be one of: add, list, remove, toggle, run"} @staticmethod def _resolve_cron_service(context: ToolContext) -> CronService: service = context.get("cron_service") if isinstance(service, CronService): return service if not context.workspace: raise RuntimeError("Cron service is unavailable for this runtime.") return CronService(f"{context.workspace}/cron/jobs.json") def _required_job_id(arguments: dict[str, Any]) -> str: job_id = str(arguments.get("job_id") or "").strip() if not job_id: raise ValueError("job_id is required") return job_id