"""File-backed internal task store.""" from __future__ import annotations import json import os import tempfile import threading from pathlib import Path from typing import Any from .models import TaskEvent, TaskRecord class TaskStore: def __init__(self, root: str | Path) -> None: self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) self.tasks_path = self.root / "tasks.json" self.events_path = self.root / "events.jsonl" self._lock = threading.Lock() def list_tasks(self) -> list[TaskRecord]: with self._lock: payload = self._read_tasks_unlocked() return [TaskRecord.from_dict(item) for item in payload.values()] def get_task(self, task_id: str) -> TaskRecord | None: with self._lock: payload = self._read_tasks_unlocked().get(task_id) return TaskRecord.from_dict(payload) if isinstance(payload, dict) else None def get_task_by_run_id(self, run_id: str) -> TaskRecord | None: for task in self.list_tasks(): if run_id in task.run_ids: return task return None def get_latest_open_task(self, session_id: str) -> TaskRecord | None: tasks = [ task for task in self.list_tasks() if task.session_id == session_id and task.is_open ] if not tasks: return None return sorted(tasks, key=lambda item: item.updated_at)[-1] def upsert_task(self, task: TaskRecord) -> None: with self._lock: payload = self._read_tasks_unlocked() payload[task.task_id] = task.to_dict() self._write_tasks_unlocked(payload) def delete_task(self, task_id: str) -> bool: with self._lock: payload = self._read_tasks_unlocked() if task_id not in payload: return False payload.pop(task_id, None) self._write_tasks_unlocked(payload) if self.events_path.exists(): kept = [] for line in self.events_path.read_text(encoding="utf-8").splitlines(): cleaned = line.strip() if not cleaned: continue event_payload = json.loads(cleaned) if not isinstance(event_payload, dict) or str(event_payload.get("task_id")) != task_id: kept.append(cleaned) self.events_path.write_text(("\n".join(kept) + "\n") if kept else "", encoding="utf-8") return True def append_event(self, event: TaskEvent) -> None: self.events_path.parent.mkdir(parents=True, exist_ok=True) with self._lock: with self.events_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(event.to_dict(), ensure_ascii=False, sort_keys=True) + "\n") def list_events(self, task_id: str | None = None) -> list[TaskEvent]: if not self.events_path.exists(): return [] results: list[TaskEvent] = [] for line in self.events_path.read_text(encoding="utf-8").splitlines(): cleaned = line.strip() if not cleaned: continue payload = json.loads(cleaned) if not isinstance(payload, dict): continue event = TaskEvent.from_dict(payload) if task_id is not None and event.task_id != task_id: continue results.append(event) return results def _read_tasks_unlocked(self) -> dict[str, dict[str, Any]]: if not self.tasks_path.exists(): return {} payload = json.loads(self.tasks_path.read_text(encoding="utf-8")) if not isinstance(payload, dict): return {} tasks = payload.get("tasks", payload) if not isinstance(tasks, dict): return {} return {str(key): dict(value) for key, value in tasks.items() if isinstance(value, dict)} def _write_tasks_unlocked(self, payload: dict[str, dict[str, Any]]) -> None: self.tasks_path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_name = tempfile.mkstemp(prefix=".tasks-", suffix=".json", dir=str(self.tasks_path.parent)) tmp_path = Path(tmp_name) try: with os.fdopen(fd, "w", encoding="utf-8") as handle: json.dump({"tasks": payload}, handle, ensure_ascii=False, indent=2, sort_keys=True) handle.write("\n") os.replace(tmp_path, self.tasks_path) finally: if tmp_path.exists(): tmp_path.unlink()