"""Local terminal and background process tools.""" from __future__ import annotations import asyncio from dataclasses import dataclass, field import json from pathlib import Path import sys from typing import Any from uuid import uuid4 def _json_result(success: bool, **payload: Any) -> str: return json.dumps({"success": success, **payload}, ensure_ascii=False, indent=2) class BackgroundProcessStore: def __init__(self) -> None: self._processes: dict[str, asyncio.subprocess.Process] = {} self._logs: dict[str, bytes] = {} async def start(self, command: str, cwd: str | None = None) -> str: process_id = uuid4().hex[:12] proc = await asyncio.create_subprocess_shell( command, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) self._processes[process_id] = proc self._logs[process_id] = b"" asyncio.create_task(self._drain(process_id, proc)) return process_id async def _drain(self, process_id: str, proc: asyncio.subprocess.Process) -> None: if proc.stdout is None: return while True: chunk = await proc.stdout.read(4096) if not chunk: break self._logs[process_id] = (self._logs.get(process_id, b"") + chunk)[-200_000:] def list(self) -> list[dict[str, Any]]: rows = [] for process_id, proc in self._processes.items(): rows.append({"process_id": process_id, "returncode": proc.returncode, "running": proc.returncode is None}) return rows def log(self, process_id: str, limit: int = 12000) -> str: return self._logs.get(process_id, b"")[-limit:].decode("utf-8", errors="replace") async def kill(self, process_id: str) -> bool: proc = self._processes.get(process_id) if proc is None: return False if proc.returncode is None: proc.terminate() try: await asyncio.wait_for(proc.wait(), timeout=5) except asyncio.TimeoutError: proc.kill() await proc.wait() return True GLOBAL_PROCESS_STORE = BackgroundProcessStore() def _workspace_cwd(workspace: str | None, working_dir: str | None) -> str | None: if not workspace: return None root = Path(workspace).expanduser().resolve() raw = Path(working_dir or ".").expanduser() candidate = raw if raw.is_absolute() else root / raw resolved = candidate.resolve() resolved.relative_to(root) return str(resolved) @dataclass(slots=True) class TerminalTool: name: str = "terminal" description: str = "Execute a shell command. Set background=true for long-running commands." toolset: str = "terminal" always_available: bool = False parameters: dict[str, Any] = field( default_factory=lambda: { "type": "object", "properties": { "command": {"type": "string"}, "working_dir": {"type": "string", "default": "."}, "timeout": {"type": "integer", "default": 60, "minimum": 1, "maximum": 600}, "background": {"type": "boolean", "default": False}, }, "required": ["command"], } ) async def execute( self, *, command: str, working_dir: str | None = None, timeout: int = 60, background: bool = False, workspace: str | None = None, ) -> str: try: if not command.strip(): raise ValueError("command is required") cwd = _workspace_cwd(workspace, working_dir) if background: process_id = await GLOBAL_PROCESS_STORE.start(command, cwd=cwd) return _json_result(True, process_id=process_id, background=True) proc = await asyncio.create_subprocess_shell( command, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) output, _ = await asyncio.wait_for(proc.communicate(), timeout=max(1, min(int(timeout or 60), 600))) text = output.decode("utf-8", errors="replace") return _json_result(True, returncode=proc.returncode, output=text[-50000:]) except Exception as exc: return _json_result(False, error=str(exc)) @dataclass(slots=True) class ProcessTool: name: str = "process" description: str = "Manage background processes started with terminal(background=true)." toolset: str = "terminal" always_available: bool = False parameters: dict[str, Any] = field( default_factory=lambda: { "type": "object", "properties": { "action": {"type": "string", "enum": ["list", "log", "kill"]}, "process_id": {"type": "string"}, }, "required": ["action"], } ) async def execute(self, *, action: str, process_id: str | None = None, **_: Any) -> str: if action == "list": return _json_result(True, processes=GLOBAL_PROCESS_STORE.list()) if action == "log": if not process_id: return _json_result(False, error="process_id is required") return _json_result(True, process_id=process_id, output=GLOBAL_PROCESS_STORE.log(process_id)) if action == "kill": if not process_id: return _json_result(False, error="process_id is required") return _json_result(await GLOBAL_PROCESS_STORE.kill(process_id), process_id=process_id) return _json_result(False, error=f"unknown action: {action}") @dataclass(slots=True) class ExecuteCodeTool: name: str = "execute_code" description: str = "Execute small Python snippets locally without external APIs." toolset: str = "terminal" always_available: bool = False parameters: dict[str, Any] = field( default_factory=lambda: { "type": "object", "properties": { "language": {"type": "string", "enum": ["python"], "default": "python"}, "code": {"type": "string"}, "timeout": {"type": "integer", "default": 30, "minimum": 1, "maximum": 120}, "working_dir": {"type": "string", "default": "."}, }, "required": ["code"], } ) async def execute( self, *, code: str, language: str = "python", timeout: int = 30, working_dir: str | None = None, workspace: str | None = None, ) -> str: try: if language != "python": raise ValueError("Only python is supported") cwd = _workspace_cwd(workspace, working_dir) proc = await asyncio.create_subprocess_exec( sys.executable, "-I", "-", cwd=cwd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) output, _ = await asyncio.wait_for( proc.communicate(code.encode("utf-8")), timeout=max(1, min(int(timeout or 30), 120)), ) return _json_result( True, language="python", returncode=proc.returncode, output=output.decode("utf-8", errors="replace")[-50000:], ) except Exception as exc: return _json_result(False, error=str(exc))