"""cron 任务运行时辅助逻辑。 这里负责把已经到点的 `CronJob` 真正翻译成一次可执行动作: 1. 纯提醒型任务:直接向目标会话投递消息; 2. agent task 型任务:构造自动执行上下文,再交给 `AgentLoop.process_direct()`; 3. 额外注入 `cron_action` 工具,让模型可以反向控制后续调度。 """ from __future__ import annotations from typing import Any from nanobot.agent.tools.cron_action import CronActionTool from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.cron.types import CronExecutionResult, CronJob async def _deliver_response( bus: MessageBus, *, channel: str, chat_id: str, content: str | None, ) -> None: # cron 统一通过 outbound 消息回到现有渠道层,避免绕开原有发送链路。 await bus.publish_outbound(OutboundMessage( channel=channel, chat_id=chat_id, content=content or "", )) def _describe_schedule(job: CronJob) -> str: """把调度对象转成面向模型的简短文本。""" if job.schedule.kind == "every": every_ms = job.schedule.every_ms or 0 return f"every {every_ms // 1000}s" if job.schedule.kind == "cron": return job.schedule.expr or "cron" return "one-time" def _resolve_session_key(job: CronJob) -> str: """为 cron task 选择一个应复用的会话 key。""" # 优先使用显式记录的 session_key,这样任务型 cron 可以延续原短期上下文。 if job.payload.session_key: return job.payload.session_key # 如果老数据没有 session_key,但有 channel/to,则退化为路由键。 if job.payload.channel and job.payload.to: return f"{job.payload.channel}:{job.payload.to}" # 再兜底到 cron 自己的命名空间,保证始终能生成稳定 key。 return f"cron:{job.id}" def _build_execution_context(job: CronJob, session_key: str) -> str: """构造注入给 agent 的自动执行上下文说明。""" schedule = _describe_schedule(job) return f"""This turn was triggered automatically by a scheduled cron job. Job ID: {job.id} Job Name: {job.name} Schedule: {schedule} Origin Session: {session_key} You are in autonomous scheduled-task mode: - This is not an interactive user turn. - Do not ask the user what to do next. - Execute the task, make the necessary tool calls, and report the concrete outcome. - If the task has reached a terminal condition, natural stopping point, or no longer needs future runs, emit a structured cron_action tool call instead of only describing it in text. - Use cron_action(action="complete_today", reason="...") when today's batch is complete and the job should resume next cycle. - Use cron_action(action="remove", reason="...") to delete the current job permanently. - Use cron_action(action="disable", reason="...") to stop the current job without deleting it. - Use cron_action(action="reschedule", ...) to change the current job's schedule deterministically. - Use the regular cron tool only if you truly need to inspect or manage additional jobs beyond the current one. """ async def run_cron_job( job: CronJob, *, agent: Any, bus: MessageBus, default_channel: str, default_chat_id: str, ) -> CronExecutionResult: """Execute one cron job according to its payload kind.""" # deliver 目标允许任务使用自己的渠道配置,否则落回默认 web 会话。 channel = job.payload.channel or default_channel chat_id = job.payload.to or default_chat_id if job.payload.kind == "system_event": # 提醒模式不需要再过一层 agent 推理,直接把原消息投递给目标会话。 message = job.payload.message if job.payload.deliver and job.payload.to: await _deliver_response(bus, channel=channel, chat_id=job.payload.to, content=message) return CronExecutionResult(response=message) # task 模式会进入 agent 主循环,因此要准备复用的 session key 和运行说明。 session_key = _resolve_session_key(job) execution_context = _build_execution_context(job, session_key) # 把 cron_action 作为“附加工具”注入,仅对当前这次 cron 执行生效。 action_tool = CronActionTool(job.id) response = await agent.process_direct( content=job.payload.message, session_key=session_key, channel=channel, chat_id=chat_id, execution_context=execution_context, extra_tools=[action_tool], ) # 若任务要求把最终结果投递出去,则沿用正常 outbound 消息链路。 if job.payload.deliver and job.payload.to: await _deliver_response(bus, channel=channel, chat_id=job.payload.to, content=response) # runtime 同时返回文本结果和结构化动作,供 CronService 后续处理。 return CronExecutionResult(response=response, action=action_tool.decision)