Files
steven_li 3b0af173cc refactor(beaver): 移除Hermes相关引用和迁移代码,完善Beaver后端主线实现
移除了所有Hermes相关的命名引用,包括:
- 从.gitignore中清理相关构建缓存文件
- 将README中的beaver-home路径配置更新
- 完善backend/README.md文档说明Beaver后端主线实现
- 移除Hermes风格的相关注释和兼容性代码
- 清理nanobot环境变量兼容性处理
- 删除技能迁移和服务迁移相关功能代码
- 更新测试用例中相关命名和函数名

BREAKING CHANGE: 移除了Hermes迁移相关API和CLI命令,不再支持nanobot环境变量兼容性
2026-05-14 17:20:32 +08:00

164 lines
6.3 KiB
Python

"""Built-in cron tool for managing scheduled Beaver Tasks."""
from __future__ import annotations
import json
from typing import Any
from beaver.services.cron_service import CronService, schedule_from_api
from beaver.tools.base import BaseTool, ToolContext, ToolResult, ToolSpec
CRON_TOOL_DESCRIPTION = (
"Create and manage scheduled Beaver notifications or Tasks. Notification mode "
"sends scheduled results to the fixed notification session; task mode creates "
"a Task run. Actions: add, list, remove, toggle, run."
)
CRON_TOOL_PARAMETERS: dict[str, Any] = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["add", "list", "remove", "toggle", "run"],
"description": "The scheduled-task operation to perform.",
},
"name": {
"type": "string",
"description": "Short scheduled-task name. Optional for add.",
},
"message": {
"type": "string",
"description": "The task instruction to run when the schedule triggers. Required for add.",
},
"schedule": {
"type": "string",
"description": "Schedule expression, for example 'every 15m', '0 9 * * *', or an ISO datetime.",
},
"every_seconds": {
"type": "integer",
"minimum": 1,
"description": "Fixed interval in seconds for recurring scheduled tasks.",
},
"cron_expr": {
"type": "string",
"description": "Cron expression such as '0 9 * * *'.",
},
"tz": {
"type": "string",
"description": "IANA timezone for cron_expr, for example 'Asia/Shanghai'.",
},
"at_iso": {
"type": "string",
"description": "ISO datetime for one-time scheduled tasks.",
},
"job_id": {
"type": "string",
"description": "Scheduled-task ID for remove, toggle, or run.",
},
"enabled": {
"type": "boolean",
"description": "Whether the scheduled task should be enabled when action is toggle.",
},
"mode": {
"type": "string",
"enum": ["notification", "task"],
"description": "Use notification for reminders/reports; use task only when the scheduled work requires Task tracking.",
},
"requires_followup": {
"type": "boolean",
"description": "Whether a task-mode scheduled run should appear as an active task awaiting user follow-up.",
},
},
"required": ["action"],
}
class CronTool(BaseTool):
"""Tool-facing wrapper around the process CronService."""
@property
def spec(self) -> ToolSpec:
return ToolSpec(
name="cron",
description=CRON_TOOL_DESCRIPTION,
input_schema=CRON_TOOL_PARAMETERS,
toolset="cron",
always_available=False,
)
async def invoke(self, arguments: dict[str, Any], context: ToolContext) -> ToolResult:
try:
result = await self._invoke(arguments, context)
return ToolResult(
success=bool(result.get("success", True)),
content=json.dumps(result, ensure_ascii=False),
tool_name=self.spec.name,
error=str(result.get("error")) if result.get("error") else None,
raw_output=result,
)
except Exception as exc:
return ToolResult(
success=False,
content=json.dumps({"success": False, "error": str(exc)}, ensure_ascii=False),
tool_name=self.spec.name,
error=str(exc),
)
async def _invoke(self, arguments: dict[str, Any], context: ToolContext) -> dict[str, Any]:
service = self._resolve_cron_service(context)
action = str(arguments.get("action") or "").strip().lower()
if action == "add":
schedule = schedule_from_api(arguments)
job = service.add_job(
name=str(arguments.get("name") or "").strip(),
message=str(arguments.get("message") or "").strip(),
schedule=schedule,
session_key=str(arguments.get("session_key") or context.session_id or "").strip() or None,
payload_kind="agent_turn",
mode=str(arguments.get("mode") or "notification").strip().lower(),
requires_followup=bool(arguments.get("requires_followup", False)),
)
return {"success": True, "job": job.to_api_dict()}
if action == "list":
include_disabled = bool(arguments.get("include_disabled", True))
return {
"success": True,
"jobs": [job.to_api_dict() for job in service.list_jobs(include_disabled=include_disabled)],
}
if action == "remove":
job_id = _required_job_id(arguments)
return {"success": service.remove_job(job_id), "job_id": job_id}
if action == "toggle":
job_id = _required_job_id(arguments)
job = service.update_enabled(job_id, bool(arguments.get("enabled", True)))
if job is None:
return {"success": False, "error": f"Scheduled task {job_id!r} was not found."}
return {"success": True, "job": job.to_api_dict()}
if action == "run":
job_id = _required_job_id(arguments)
ok = await service.run_job(job_id, force=True)
job = service.get_job(job_id)
return {
"success": ok,
"job_id": job_id,
"job": job.to_api_dict() if job is not None else None,
}
return {"success": False, "error": "action must be one of: add, list, remove, toggle, run"}
@staticmethod
def _resolve_cron_service(context: ToolContext) -> CronService:
service = context.get("cron_service")
if isinstance(service, CronService):
return service
if not context.workspace:
raise RuntimeError("Cron service is unavailable for this runtime.")
return CronService(f"{context.workspace}/cron/jobs.json")
def _required_job_id(arguments: dict[str, Any]) -> str:
job_id = str(arguments.get("job_id") or "").strip()
if not job_id:
raise ValueError("job_id is required")
return job_id