import asyncio import threading from beaver.foundation.models import CronExecutionResult, CronRunRecord, CronSchedule from beaver.tools.base import ToolContext from beaver.tools.builtins import CronTool from beaver.services.cron_service import CronService, compute_next_run, parse_schedule, schedule_from_api def test_parse_schedule_expressions() -> None: interval = parse_schedule("every 15m") assert interval.kind == "every" assert interval.every_ms == 15 * 60 * 1000 one_shot = parse_schedule("30s") assert one_shot.kind == "at" assert one_shot.at_ms is not None cron = parse_schedule("0 9 * * *") assert cron.kind == "cron" assert cron.expr == "0 9 * * *" def test_schedule_from_frontend_payload() -> None: every = schedule_from_api({"every_seconds": 60}) assert every.kind == "every" assert every.every_ms == 60_000 cron = schedule_from_api({"cron_expr": "0 10 * * *"}) assert cron.kind == "cron" def test_legacy_interval_schedule_recovers_duration_from_display() -> None: schedule = CronSchedule.from_dict( { "kind": "every", "every_ms": None, "display": "every 1800s", } ) assert schedule.every_ms == 30 * 60 * 1000 def test_compute_next_run_skips_missed_interval() -> None: schedule = CronSchedule(kind="every", every_ms=60_000) assert compute_next_run(schedule, now_ms=1_000_000, last_run_at_ms=0) > 1_000_000 def test_manual_run_records_task_history(tmp_path) -> None: async def on_job(job): return CronExecutionResult(response="done", task_id=f"task-{job.id}", run_id="run-1") service = CronService(tmp_path / "jobs.json", on_job=on_job) job = service.add_job( name="Daily check", message="Check the project", schedule=CronSchedule(kind="every", every_ms=3600_000), session_key="web:default", ) assert asyncio.run(service.run_job(job.id, force=True)) is True updated = service.get_job(job.id) assert updated is not None assert updated.last_status == "ok" assert updated.history[-1].task_id == f"task-{job.id}" assert updated.to_api_dict()["last_task_id"] == f"task-{job.id}" def test_manual_run_records_scheduled_run_output(tmp_path) -> None: async def on_job(job, run): return CronExecutionResult( response=f"notification for {run.scheduled_run_id}", run_id="run-notify", notification_session_id="notify:default:scheduled", mode="notification", ) service = CronService(tmp_path / "jobs.json", on_job=on_job) job = service.add_job( name="Daily news", message="Summarize news", schedule=CronSchedule(kind="every", every_ms=3600_000), ) assert asyncio.run(service.run_job(job.id, force=True)) is True updated = service.get_job(job.id) assert updated is not None run = updated.history[-1] assert run.scheduled_run_id assert run.output == f"notification for {run.scheduled_run_id}" assert run.notification_session_id == "notify:default:scheduled" assert updated.to_api_dict()["last_scheduled_run_id"] == run.scheduled_run_id def test_persisted_interval_job_keeps_schedule_and_next_run(tmp_path) -> None: store_path = tmp_path / "jobs.json" service = CronService(store_path) job = service.add_job( name="Hydration reminder", message="Drink water", schedule=CronSchedule(kind="every", every_ms=30 * 60 * 1000), ) reloaded = CronService(store_path).get_job(job.id) assert reloaded is not None assert reloaded.schedule.every_ms == 30 * 60 * 1000 assert reloaded.next_run_at_ms == job.next_run_at_ms def test_running_scheduler_can_disable_job_without_deadlock(tmp_path) -> None: service = CronService(tmp_path / "jobs.json") job = service.add_job( name="Hydration reminder", message="Drink water", schedule=CronSchedule(kind="every", every_ms=30 * 60 * 1000), ) service._running = True completed = threading.Event() enabled_values: list[bool] = [] def disable_job() -> None: updated = service.update_enabled(job.id, False) if updated is not None: enabled_values.append(updated.enabled) completed.set() worker = threading.Thread(target=disable_job, daemon=True) worker.start() assert completed.wait(0.5), "disabling a running cron job should not deadlock" assert enabled_values == [False] assert service.get_job(job.id).enabled is False def test_cron_tool_uses_runtime_service(tmp_path) -> None: service = CronService(tmp_path / "jobs.json") tool = CronTool() result = asyncio.run( tool.invoke( { "action": "add", "name": "Tool-created task", "message": "Check the queue", "every_seconds": 300, }, ToolContext(session_id="session-1", services={"cron_service": service}), ) ) assert result.success is True jobs = service.list_jobs(include_disabled=True) assert len(jobs) == 1 assert jobs[0].payload.session_key == "session-1" def test_mark_run_engaged_links_task(tmp_path) -> None: service = CronService(tmp_path / "jobs.json") job = service.add_job( name="Daily news", message="Summarize news", schedule=CronSchedule(kind="every", every_ms=3600_000), ) run = CronRunRecord( started_at_ms=1, status="ok", output="news summary", notification_session_id="notify:default:scheduled", ) job.history.append(run) service._save_jobs() linked = service.mark_run_engaged(run.scheduled_run_id, task_id="task-1", intent="revise_once") assert linked is not None updated = service.get_run(run.scheduled_run_id) assert updated is not None assert updated[1].engaged is True assert updated[1].task_id == "task-1"