285 lines
10 KiB
Python
285 lines
10 KiB
Python
"""Shell execution tool."""
|
|
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import shlex
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from nanobot.agent.tools.base import Tool
|
|
|
|
|
|
class ExecTool(Tool):
|
|
"""Tool to execute shell commands."""
|
|
|
|
def __init__(
|
|
self,
|
|
timeout: int = 60,
|
|
working_dir: str | None = None,
|
|
deny_patterns: list[str] | None = None,
|
|
allow_patterns: list[str] | None = None,
|
|
restrict_to_workspace: bool = False,
|
|
protected_paths: list[Path] | None = None,
|
|
):
|
|
self.timeout = timeout
|
|
self.working_dir = working_dir
|
|
self.deny_patterns = deny_patterns or [
|
|
r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr
|
|
r"\bdel\s+/[fq]\b", # del /f, del /q
|
|
r"\brmdir\s+/s\b", # rmdir /s
|
|
r"(?:^|[;&|]\s*)format\b", # format (as standalone command only)
|
|
r"\b(mkfs|diskpart)\b", # disk operations
|
|
r"\bdd\s+if=", # dd
|
|
r">\s*/dev/sd", # write to disk
|
|
r"\b(shutdown|reboot|poweroff)\b", # system power
|
|
r":\(\)\s*\{.*\};\s*:", # fork bomb
|
|
]
|
|
self.allow_patterns = allow_patterns or []
|
|
self.restrict_to_workspace = restrict_to_workspace
|
|
self.protected_paths = [Path(p).expanduser().resolve() for p in protected_paths or []]
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "exec"
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
return "Execute a shell command and return its output. Use with caution."
|
|
|
|
@property
|
|
def parameters(self) -> dict[str, Any]:
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"command": {
|
|
"type": "string",
|
|
"description": "The shell command to execute"
|
|
},
|
|
"working_dir": {
|
|
"type": "string",
|
|
"description": "Optional working directory for the command"
|
|
}
|
|
},
|
|
"required": ["command"]
|
|
}
|
|
|
|
async def execute(self, command: str, working_dir: str | None = None, **kwargs: Any) -> str:
|
|
cwd = working_dir or self.working_dir or os.getcwd()
|
|
guard_error = self._guard_command(command, cwd)
|
|
if guard_error:
|
|
return guard_error
|
|
|
|
try:
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=cwd,
|
|
)
|
|
|
|
try:
|
|
stdout, stderr = await asyncio.wait_for(
|
|
process.communicate(),
|
|
timeout=self.timeout
|
|
)
|
|
except asyncio.TimeoutError:
|
|
process.kill()
|
|
# Wait for the process to fully terminate so pipes are
|
|
# drained and file descriptors are released.
|
|
try:
|
|
await asyncio.wait_for(process.wait(), timeout=5.0)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
return f"Error: Command timed out after {self.timeout} seconds"
|
|
|
|
output_parts = []
|
|
|
|
if stdout:
|
|
output_parts.append(stdout.decode("utf-8", errors="replace"))
|
|
|
|
if stderr:
|
|
stderr_text = stderr.decode("utf-8", errors="replace")
|
|
if stderr_text.strip():
|
|
output_parts.append(f"STDERR:\n{stderr_text}")
|
|
|
|
if process.returncode != 0:
|
|
output_parts.append(f"\nExit code: {process.returncode}")
|
|
|
|
result = "\n".join(output_parts) if output_parts else "(no output)"
|
|
|
|
# Truncate very long output
|
|
max_len = 10000
|
|
if len(result) > max_len:
|
|
result = result[:max_len] + f"\n... (truncated, {len(result) - max_len} more chars)"
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
return f"Error executing command: {str(e)}"
|
|
|
|
def _guard_command(self, command: str, cwd: str) -> str | None:
|
|
"""Best-effort safety guard for potentially destructive commands."""
|
|
cmd = command.strip()
|
|
lower = cmd.lower()
|
|
|
|
for pattern in self.deny_patterns:
|
|
if re.search(pattern, lower):
|
|
return "Error: Command blocked by safety guard (dangerous pattern detected)"
|
|
|
|
if self.allow_patterns:
|
|
if not any(re.search(p, lower) for p in self.allow_patterns):
|
|
return "Error: Command blocked by safety guard (not in allowlist)"
|
|
|
|
if self.restrict_to_workspace:
|
|
if "..\\" in cmd or "../" in cmd:
|
|
return "Error: Command blocked by safety guard (path traversal detected)"
|
|
|
|
cwd_path = Path(cwd).resolve()
|
|
|
|
win_paths = re.findall(r"[A-Za-z]:\\[^\\\"']+", cmd)
|
|
# Only match absolute paths — avoid false positives on relative
|
|
# paths like ".venv/bin/python" where "/bin/python" would be
|
|
# incorrectly extracted by the old pattern.
|
|
posix_paths = re.findall(r"(?:^|[\s|>])(/[^\s\"'>]+)", cmd)
|
|
|
|
for raw in win_paths + posix_paths:
|
|
try:
|
|
p = Path(raw.strip()).resolve()
|
|
except Exception:
|
|
continue
|
|
if p.is_absolute() and cwd_path not in p.parents and p != cwd_path:
|
|
return "Error: Command blocked by safety guard (path outside working dir)"
|
|
|
|
protected_error = self._guard_protected_paths(command, cwd)
|
|
if protected_error:
|
|
return protected_error
|
|
|
|
return None
|
|
|
|
def _guard_protected_paths(self, command: str, cwd: str) -> str | None:
|
|
if not self.protected_paths:
|
|
return None
|
|
|
|
cwd_path = Path(cwd).expanduser().resolve()
|
|
if self._is_blocked_clawhub_install(command, cwd_path):
|
|
return self._protected_write_error()
|
|
|
|
if not self._looks_like_write(command):
|
|
return None
|
|
|
|
for raw in self._extract_path_tokens(command):
|
|
resolved = self._resolve_command_path(raw, cwd_path)
|
|
if resolved and any(self._is_relative_to(resolved, root) for root in self.protected_paths):
|
|
return self._protected_write_error()
|
|
|
|
return None
|
|
|
|
def _is_blocked_clawhub_install(self, command: str, cwd_path: Path) -> bool:
|
|
lower = command.lower()
|
|
if "clawhub" not in lower or not re.search(r"\b(install|update)\b", lower):
|
|
return False
|
|
|
|
workdir = self._extract_flag_value(command, "--workdir")
|
|
if workdir:
|
|
resolved = self._resolve_command_path(workdir, cwd_path)
|
|
return any(
|
|
resolved == root.parent or self._is_relative_to(root, resolved)
|
|
for root in self.protected_paths
|
|
)
|
|
|
|
return any(cwd_path == root.parent for root in self.protected_paths)
|
|
|
|
@staticmethod
|
|
def _protected_write_error() -> str:
|
|
return (
|
|
"Error: Direct writes to workspace skills are blocked. "
|
|
"Stage the skill for review and require explicit user approval before installation."
|
|
)
|
|
|
|
@staticmethod
|
|
def _is_relative_to(path: Path, root: Path) -> bool:
|
|
try:
|
|
path.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
@staticmethod
|
|
def _extract_flag_value(command: str, flag: str) -> str | None:
|
|
tokens = ExecTool._tokenize(command)
|
|
for i, token in enumerate(tokens):
|
|
if token == flag and i + 1 < len(tokens):
|
|
return tokens[i + 1]
|
|
if token.startswith(flag + "="):
|
|
return token.split("=", 1)[1]
|
|
return None
|
|
|
|
@staticmethod
|
|
def _looks_like_write(command: str) -> bool:
|
|
lower = command.lower()
|
|
if re.search(r"(^|[^<])>>?\s*\S+", command):
|
|
return True
|
|
if re.search(r"\bsed\s+-i(?:\s|$)", lower):
|
|
return True
|
|
return bool(re.search(
|
|
r"\b(cp|mv|rm|mkdir|touch|install|tee|tar|unzip|zip|chmod|chown|git|python|python3|node|npx|bash|sh|zsh|pwsh|powershell)\b",
|
|
lower,
|
|
))
|
|
|
|
@staticmethod
|
|
def _extract_path_tokens(command: str) -> list[str]:
|
|
tokens = ExecTool._tokenize(command)
|
|
path_tokens: list[str] = []
|
|
skip_next = False
|
|
for i, token in enumerate(tokens):
|
|
if skip_next:
|
|
skip_next = False
|
|
continue
|
|
if token in {"--workdir", "-C"}:
|
|
if i + 1 < len(tokens):
|
|
path_tokens.append(tokens[i + 1])
|
|
skip_next = True
|
|
continue
|
|
if "=" in token:
|
|
key, value = token.split("=", 1)
|
|
if key in {"--workdir"}:
|
|
path_tokens.append(value)
|
|
continue
|
|
cleaned = token.strip("\"'")
|
|
if ExecTool._looks_like_path_token(cleaned):
|
|
path_tokens.append(cleaned)
|
|
return path_tokens
|
|
|
|
@staticmethod
|
|
def _looks_like_path_token(token: str) -> bool:
|
|
if not token or token in {".", ".."}:
|
|
return True
|
|
if token.startswith(("~", "/", "./", "../")):
|
|
return True
|
|
if re.match(r"^[A-Za-z]:\\", token):
|
|
return True
|
|
return "/" in token or "\\" in token
|
|
|
|
@staticmethod
|
|
def _resolve_command_path(raw: str, cwd_path: Path) -> Path | None:
|
|
token = raw.strip().strip("\"'")
|
|
if not token:
|
|
return None
|
|
try:
|
|
path = Path(token).expanduser()
|
|
if not path.is_absolute():
|
|
path = (cwd_path / path).resolve()
|
|
else:
|
|
path = path.resolve()
|
|
return path
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _tokenize(command: str) -> list[str]:
|
|
try:
|
|
return shlex.split(command, posix=os.name != "nt")
|
|
except ValueError:
|
|
return command.split()
|