修改了nanobot,往Hermes agent的风格走,进度1/3
This commit is contained in:
275
app-instance/backend-old/nanobot/agent/tools/filesystem.py
Normal file
275
app-instance/backend-old/nanobot/agent/tools/filesystem.py
Normal file
@ -0,0 +1,275 @@
|
||||
"""File system tools: read, write, edit."""
|
||||
|
||||
import difflib
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from nanobot.agent.tools.base import Tool
|
||||
|
||||
|
||||
def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | None = None) -> Path:
|
||||
"""Resolve path against workspace (if relative) and enforce directory restriction."""
|
||||
p = Path(path).expanduser()
|
||||
if not p.is_absolute() and workspace:
|
||||
p = workspace / p
|
||||
resolved = p.resolve()
|
||||
if allowed_dir:
|
||||
try:
|
||||
resolved.relative_to(allowed_dir.resolve())
|
||||
except ValueError:
|
||||
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
|
||||
return resolved
|
||||
|
||||
|
||||
def _is_relative_to(path: Path, root: Path) -> bool:
|
||||
try:
|
||||
path.relative_to(root.resolve())
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def _protected_write_error() -> str:
|
||||
return (
|
||||
"Error: Direct writes to workspace skills are blocked. "
|
||||
"Stage the skill for review and require explicit user approval before installation."
|
||||
)
|
||||
|
||||
|
||||
class ReadFileTool(Tool):
|
||||
"""Tool to read file contents."""
|
||||
|
||||
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
|
||||
self._workspace = workspace
|
||||
self._allowed_dir = allowed_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "read_file"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return "Read the contents of a file at the given path."
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "The file path to read"
|
||||
}
|
||||
},
|
||||
"required": ["path"]
|
||||
}
|
||||
|
||||
async def execute(self, path: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
|
||||
if not file_path.exists():
|
||||
return f"Error: File not found: {path}"
|
||||
if not file_path.is_file():
|
||||
return f"Error: Not a file: {path}"
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
return content
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error reading file: {str(e)}"
|
||||
|
||||
|
||||
class WriteFileTool(Tool):
|
||||
"""Tool to write content to a file."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace: Path | None = None,
|
||||
allowed_dir: Path | None = None,
|
||||
protected_paths: list[Path] | None = None,
|
||||
):
|
||||
self._workspace = workspace
|
||||
self._allowed_dir = allowed_dir
|
||||
self._protected_paths = [p.expanduser().resolve() for p in protected_paths or []]
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "write_file"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return "Write content to a file at the given path. Creates parent directories if needed."
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "The file path to write to"
|
||||
},
|
||||
"content": {
|
||||
"type": "string",
|
||||
"description": "The content to write"
|
||||
}
|
||||
},
|
||||
"required": ["path", "content"]
|
||||
}
|
||||
|
||||
async def execute(self, path: str, content: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
|
||||
if any(_is_relative_to(file_path, protected) for protected in self._protected_paths):
|
||||
return _protected_write_error()
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
return f"Successfully wrote {len(content)} bytes to {file_path}"
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error writing file: {str(e)}"
|
||||
|
||||
|
||||
class EditFileTool(Tool):
|
||||
"""Tool to edit a file by replacing text."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
workspace: Path | None = None,
|
||||
allowed_dir: Path | None = None,
|
||||
protected_paths: list[Path] | None = None,
|
||||
):
|
||||
self._workspace = workspace
|
||||
self._allowed_dir = allowed_dir
|
||||
self._protected_paths = [p.expanduser().resolve() for p in protected_paths or []]
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "edit_file"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return "Edit a file by replacing old_text with new_text. The old_text must exist exactly in the file."
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "The file path to edit"
|
||||
},
|
||||
"old_text": {
|
||||
"type": "string",
|
||||
"description": "The exact text to find and replace"
|
||||
},
|
||||
"new_text": {
|
||||
"type": "string",
|
||||
"description": "The text to replace with"
|
||||
}
|
||||
},
|
||||
"required": ["path", "old_text", "new_text"]
|
||||
}
|
||||
|
||||
async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
|
||||
if any(_is_relative_to(file_path, protected) for protected in self._protected_paths):
|
||||
return _protected_write_error()
|
||||
if not file_path.exists():
|
||||
return f"Error: File not found: {path}"
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
|
||||
if old_text not in content:
|
||||
return self._not_found_message(old_text, content, path)
|
||||
|
||||
# Count occurrences
|
||||
count = content.count(old_text)
|
||||
if count > 1:
|
||||
return f"Warning: old_text appears {count} times. Please provide more context to make it unique."
|
||||
|
||||
new_content = content.replace(old_text, new_text, 1)
|
||||
file_path.write_text(new_content, encoding="utf-8")
|
||||
|
||||
return f"Successfully edited {file_path}"
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error editing file: {str(e)}"
|
||||
|
||||
@staticmethod
|
||||
def _not_found_message(old_text: str, content: str, path: str) -> str:
|
||||
"""Build a helpful error when old_text is not found."""
|
||||
lines = content.splitlines(keepends=True)
|
||||
old_lines = old_text.splitlines(keepends=True)
|
||||
window = len(old_lines)
|
||||
|
||||
best_ratio, best_start = 0.0, 0
|
||||
for i in range(max(1, len(lines) - window + 1)):
|
||||
ratio = difflib.SequenceMatcher(None, old_lines, lines[i : i + window]).ratio()
|
||||
if ratio > best_ratio:
|
||||
best_ratio, best_start = ratio, i
|
||||
|
||||
if best_ratio > 0.5:
|
||||
diff = "\n".join(difflib.unified_diff(
|
||||
old_lines, lines[best_start : best_start + window],
|
||||
fromfile="old_text (provided)", tofile=f"{path} (actual, line {best_start + 1})",
|
||||
lineterm="",
|
||||
))
|
||||
return f"Error: old_text not found in {path}.\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}"
|
||||
return f"Error: old_text not found in {path}. No similar text found. Verify the file content."
|
||||
|
||||
|
||||
class ListDirTool(Tool):
|
||||
"""Tool to list directory contents."""
|
||||
|
||||
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
|
||||
self._workspace = workspace
|
||||
self._allowed_dir = allowed_dir
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "list_dir"
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
return "List the contents of a directory."
|
||||
|
||||
@property
|
||||
def parameters(self) -> dict[str, Any]:
|
||||
return {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "The directory path to list"
|
||||
}
|
||||
},
|
||||
"required": ["path"]
|
||||
}
|
||||
|
||||
async def execute(self, path: str, **kwargs: Any) -> str:
|
||||
try:
|
||||
dir_path = _resolve_path(path, self._workspace, self._allowed_dir)
|
||||
if not dir_path.exists():
|
||||
return f"Error: Directory not found: {path}"
|
||||
if not dir_path.is_dir():
|
||||
return f"Error: Not a directory: {path}"
|
||||
|
||||
items = []
|
||||
for item in sorted(dir_path.iterdir()):
|
||||
prefix = "📁 " if item.is_dir() else "📄 "
|
||||
items.append(f"{prefix}{item.name}")
|
||||
|
||||
if not items:
|
||||
return f"Directory {path} is empty"
|
||||
|
||||
return "\n".join(items)
|
||||
except PermissionError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error listing directory: {str(e)}"
|
||||
Reference in New Issue
Block a user