"""Heartbeat service - periodic agent wake-up to check for tasks.""" import asyncio from pathlib import Path from typing import Any, Callable, Coroutine from loguru import logger # Default interval: 30 minutes DEFAULT_HEARTBEAT_INTERVAL_S = 30 * 60 # Token the agent replies with when there is nothing to report HEARTBEAT_OK_TOKEN = "HEARTBEAT_OK" # The prompt sent to agent during heartbeat HEARTBEAT_PROMPT = ( "Read HEARTBEAT.md in your workspace and follow any instructions listed there. " f"If nothing needs attention, reply with exactly: {HEARTBEAT_OK_TOKEN}" ) def _is_heartbeat_empty(content: str | None) -> bool: """Check if HEARTBEAT.md has no actionable content.""" if not content: return True # Lines to skip: empty, headers, HTML comments, empty checkboxes skip_patterns = {"- [ ]", "* [ ]", "- [x]", "* [x]"} for line in content.split("\n"): line = line.strip() if not line or line.startswith("#") or line.startswith("