"""Shell execution tool.""" import asyncio import os import re import shlex from pathlib import Path from typing import Any from nanobot.agent.tools.base import Tool class ExecTool(Tool): """Tool to execute shell commands.""" def __init__( self, timeout: int = 60, working_dir: str | None = None, deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, restrict_to_workspace: bool = False, protected_paths: list[Path] | None = None, ): self.timeout = timeout self.working_dir = working_dir self.deny_patterns = deny_patterns or [ r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr r"\bdel\s+/[fq]\b", # del /f, del /q r"\brmdir\s+/s\b", # rmdir /s r"(?:^|[;&|]\s*)format\b", # format (as standalone command only) r"\b(mkfs|diskpart)\b", # disk operations r"\bdd\s+if=", # dd r">\s*/dev/sd", # write to disk r"\b(shutdown|reboot|poweroff)\b", # system power r":\(\)\s*\{.*\};\s*:", # fork bomb ] self.allow_patterns = allow_patterns or [] self.restrict_to_workspace = restrict_to_workspace self.protected_paths = [Path(p).expanduser().resolve() for p in protected_paths or []] @property def name(self) -> str: return "exec" @property def description(self) -> str: return "Execute a shell command and return its output. Use with caution." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute" }, "working_dir": { "type": "string", "description": "Optional working directory for the command" } }, "required": ["command"] } async def execute(self, command: str, working_dir: str | None = None, **kwargs: Any) -> str: cwd = working_dir or self.working_dir or os.getcwd() guard_error = self._guard_command(command, cwd) if guard_error: return guard_error try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout ) except asyncio.TimeoutError: process.kill() # Wait for the process to fully terminate so pipes are # drained and file descriptors are released. try: await asyncio.wait_for(process.wait(), timeout=5.0) except asyncio.TimeoutError: pass return f"Error: Command timed out after {self.timeout} seconds" output_parts = [] if stdout: output_parts.append(stdout.decode("utf-8", errors="replace")) if stderr: stderr_text = stderr.decode("utf-8", errors="replace") if stderr_text.strip(): output_parts.append(f"STDERR:\n{stderr_text}") if process.returncode != 0: output_parts.append(f"\nExit code: {process.returncode}") result = "\n".join(output_parts) if output_parts else "(no output)" # Truncate very long output max_len = 10000 if len(result) > max_len: result = result[:max_len] + f"\n... (truncated, {len(result) - max_len} more chars)" return result except Exception as e: return f"Error executing command: {str(e)}" def _guard_command(self, command: str, cwd: str) -> str | None: """Best-effort safety guard for potentially destructive commands.""" cmd = command.strip() lower = cmd.lower() for pattern in self.deny_patterns: if re.search(pattern, lower): return "Error: Command blocked by safety guard (dangerous pattern detected)" if self.allow_patterns: if not any(re.search(p, lower) for p in self.allow_patterns): return "Error: Command blocked by safety guard (not in allowlist)" if self.restrict_to_workspace: if "..\\" in cmd or "../" in cmd: return "Error: Command blocked by safety guard (path traversal detected)" cwd_path = Path(cwd).resolve() win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd) # Only match absolute paths — avoid false positives on relative # paths like ".venv/bin/python" where "/bin/python" would be # incorrectly extracted by the old pattern. posix_paths = re.findall(r"(?:^|[\s|>])(/[^\s\"'>]+)", cmd) for raw in win_paths + posix_paths: try: p = Path(raw.strip()).resolve() except Exception: continue if p.is_absolute() and cwd_path not in p.parents and p != cwd_path: return "Error: Command blocked by safety guard (path outside working dir)" protected_error = self._guard_protected_paths(command, cwd) if protected_error: return protected_error return None def _guard_protected_paths(self, command: str, cwd: str) -> str | None: if not self.protected_paths: return None cwd_path = Path(cwd).expanduser().resolve() if self._is_blocked_clawhub_install(command, cwd_path): return self._protected_write_error() if not self._looks_like_write(command): return None for raw in self._extract_path_tokens(command): resolved = self._resolve_command_path(raw, cwd_path) if resolved and any(self._is_relative_to(resolved, root) for root in self.protected_paths): return self._protected_write_error() return None def _is_blocked_clawhub_install(self, command: str, cwd_path: Path) -> bool: lower = command.lower() if "clawhub" not in lower or not re.search(r"\b(install|update)\b", lower): return False workdir = self._extract_flag_value(command, "--workdir") if workdir: resolved = self._resolve_command_path(workdir, cwd_path) return any( resolved == root.parent or self._is_relative_to(root, resolved) for root in self.protected_paths ) return any(cwd_path == root.parent for root in self.protected_paths) @staticmethod def _protected_write_error() -> str: return ( "Error: Direct writes to workspace skills are blocked. " "Stage the skill for review and require explicit user approval before installation." ) @staticmethod def _is_relative_to(path: Path, root: Path) -> bool: try: path.relative_to(root) return True except ValueError: return False @staticmethod def _extract_flag_value(command: str, flag: str) -> str | None: tokens = ExecTool._tokenize(command) for i, token in enumerate(tokens): if token == flag and i + 1 < len(tokens): return tokens[i + 1] if token.startswith(flag + "="): return token.split("=", 1)[1] return None @staticmethod def _looks_like_write(command: str) -> bool: lower = command.lower() if re.search(r"(^|[^<])>>?\s*\S+", command): return True if re.search(r"\bsed\s+-i(?:\s|$)", lower): return True return bool(re.search( r"\b(cp|mv|rm|mkdir|touch|install|tee|tar|unzip|zip|chmod|chown|git|python|python3|node|npx|bash|sh|zsh|pwsh|powershell)\b", lower, )) @staticmethod def _extract_path_tokens(command: str) -> list[str]: tokens = ExecTool._tokenize(command) path_tokens: list[str] = [] skip_next = False for i, token in enumerate(tokens): if skip_next: skip_next = False continue if token in {"--workdir", "-C"}: if i + 1 < len(tokens): path_tokens.append(tokens[i + 1]) skip_next = True continue if "=" in token: key, value = token.split("=", 1) if key in {"--workdir"}: path_tokens.append(value) continue cleaned = token.strip("\"'") if ExecTool._looks_like_path_token(cleaned): path_tokens.append(cleaned) return path_tokens @staticmethod def _looks_like_path_token(token: str) -> bool: if not token or token in {".", ".."}: return True if token.startswith(("~", "/", "./", "../")): return True if re.match(r"^[A-Za-z]:\\", token): return True return "/" in token or "\\" in token @staticmethod def _resolve_command_path(raw: str, cwd_path: Path) -> Path | None: token = raw.strip().strip("\"'") if not token: return None try: path = Path(token).expanduser() if not path.is_absolute(): path = (cwd_path / path).resolve() else: path = path.resolve() return path except Exception: return None @staticmethod def _tokenize(command: str) -> list[str]: try: return shlex.split(command, posix=os.name != "nt") except ValueError: return command.split()