- 集成MCP连接管理器,支持MCP服务器连接 - 添加多种内置工具:ClarifyTool、CronTool、DelegateTool、ExecuteCodeTool、 PatchFileTool、ProcessTool、SendMessageTool、SpawnTool、TerminalTool、 TodoTool、WebFetchTool、WebSearchTool、WriteFileTool等 - 实现工具注册和装配功能 - 添加技能选择上下文参数 - 支持思考模式控制参数thinking_enabled feat(coordinator): 重构任务执行计划器参数命名 - 将learning_candidate_enabled重命名为allow_candidate_generation - 更新TeamGraphScheduler中的参数传递 - 修改LocalAgentRunner中的相关参数处理 - 更新README文档中的相应描述 refactor(context): 标准化工具调用参数格式 - 添加_json导入用于参数序列化 - 实现_provider_tool_calls方法标准化OpenAI兼容的工具调用载荷 - 修复工具调用中参数非字符串类型的序列化问题 refactor(session): 优化消息历史记录过滤逻辑 - 修改get_messages_as_conversation为基于运行状态过滤消息 - 排除未完成、失败或错误结束的运行记录 - 改进对话历史的可见性控制机制 fix(store): 修复FTS索引重建逻辑 - 添加异常处理防止FTS索引创建失败 - 实现_rebuild_fts_index方法重新构建全文搜索索引 - 优化索引触发器和表的维护流程
120 lines
4.6 KiB
Python
120 lines
4.6 KiB
Python
"""File-backed internal task store."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .models import TaskEvent, TaskRecord
|
|
|
|
|
|
class TaskStore:
|
|
def __init__(self, root: str | Path) -> None:
|
|
self.root = Path(root)
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
self.tasks_path = self.root / "tasks.json"
|
|
self.events_path = self.root / "events.jsonl"
|
|
self._lock = threading.Lock()
|
|
|
|
def list_tasks(self) -> list[TaskRecord]:
|
|
with self._lock:
|
|
payload = self._read_tasks_unlocked()
|
|
return [TaskRecord.from_dict(item) for item in payload.values()]
|
|
|
|
def get_task(self, task_id: str) -> TaskRecord | None:
|
|
with self._lock:
|
|
payload = self._read_tasks_unlocked().get(task_id)
|
|
return TaskRecord.from_dict(payload) if isinstance(payload, dict) else None
|
|
|
|
def get_task_by_run_id(self, run_id: str) -> TaskRecord | None:
|
|
for task in self.list_tasks():
|
|
if run_id in task.run_ids:
|
|
return task
|
|
return None
|
|
|
|
def get_latest_open_task(self, session_id: str) -> TaskRecord | None:
|
|
tasks = [
|
|
task
|
|
for task in self.list_tasks()
|
|
if task.session_id == session_id and task.is_open
|
|
]
|
|
if not tasks:
|
|
return None
|
|
return sorted(tasks, key=lambda item: item.updated_at)[-1]
|
|
|
|
def upsert_task(self, task: TaskRecord) -> None:
|
|
with self._lock:
|
|
payload = self._read_tasks_unlocked()
|
|
payload[task.task_id] = task.to_dict()
|
|
self._write_tasks_unlocked(payload)
|
|
|
|
def delete_task(self, task_id: str) -> bool:
|
|
with self._lock:
|
|
payload = self._read_tasks_unlocked()
|
|
if task_id not in payload:
|
|
return False
|
|
payload.pop(task_id, None)
|
|
self._write_tasks_unlocked(payload)
|
|
if self.events_path.exists():
|
|
kept = []
|
|
for line in self.events_path.read_text(encoding="utf-8").splitlines():
|
|
cleaned = line.strip()
|
|
if not cleaned:
|
|
continue
|
|
event_payload = json.loads(cleaned)
|
|
if not isinstance(event_payload, dict) or str(event_payload.get("task_id")) != task_id:
|
|
kept.append(cleaned)
|
|
self.events_path.write_text(("\n".join(kept) + "\n") if kept else "", encoding="utf-8")
|
|
return True
|
|
|
|
def append_event(self, event: TaskEvent) -> None:
|
|
self.events_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self._lock:
|
|
with self.events_path.open("a", encoding="utf-8") as handle:
|
|
handle.write(json.dumps(event.to_dict(), ensure_ascii=False, sort_keys=True) + "\n")
|
|
|
|
def list_events(self, task_id: str | None = None) -> list[TaskEvent]:
|
|
if not self.events_path.exists():
|
|
return []
|
|
results: list[TaskEvent] = []
|
|
for line in self.events_path.read_text(encoding="utf-8").splitlines():
|
|
cleaned = line.strip()
|
|
if not cleaned:
|
|
continue
|
|
payload = json.loads(cleaned)
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
event = TaskEvent.from_dict(payload)
|
|
if task_id is not None and event.task_id != task_id:
|
|
continue
|
|
results.append(event)
|
|
return results
|
|
|
|
def _read_tasks_unlocked(self) -> dict[str, dict[str, Any]]:
|
|
if not self.tasks_path.exists():
|
|
return {}
|
|
payload = json.loads(self.tasks_path.read_text(encoding="utf-8"))
|
|
if not isinstance(payload, dict):
|
|
return {}
|
|
tasks = payload.get("tasks", payload)
|
|
if not isinstance(tasks, dict):
|
|
return {}
|
|
return {str(key): dict(value) for key, value in tasks.items() if isinstance(value, dict)}
|
|
|
|
def _write_tasks_unlocked(self, payload: dict[str, dict[str, Any]]) -> None:
|
|
self.tasks_path.parent.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp_name = tempfile.mkstemp(prefix=".tasks-", suffix=".json", dir=str(self.tasks_path.parent))
|
|
tmp_path = Path(tmp_name)
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as handle:
|
|
json.dump({"tasks": payload}, handle, ensure_ascii=False, indent=2, sort_keys=True)
|
|
handle.write("\n")
|
|
os.replace(tmp_path, self.tasks_path)
|
|
finally:
|
|
if tmp_path.exists():
|
|
tmp_path.unlink()
|