150 lines
5.0 KiB
Python
150 lines
5.0 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
|
|
from beaver.tools import ObjectBackedTool, ToolContext
|
|
from beaver.tools.builtins import ListDirectoryTool, PatchFileTool, ReadFileTool, SearchFilesTool, WriteFileTool
|
|
|
|
|
|
def _run_tool(tool, arguments: dict, workspace: Path):
|
|
return asyncio.run(
|
|
ObjectBackedTool(tool).invoke(arguments, ToolContext(workspace=str(workspace)))
|
|
)
|
|
|
|
|
|
def _payload(result):
|
|
return json.loads(result.content)
|
|
|
|
|
|
def test_list_directory_is_workspace_scoped(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
(workspace / "README.md").write_text("# Hello\n", encoding="utf-8")
|
|
(workspace / "src").mkdir()
|
|
|
|
result = _run_tool(ListDirectoryTool(), {"path": "."}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is True
|
|
assert payload["success"] is True
|
|
assert [entry["path"] for entry in payload["entries"]] == ["src", "README.md"]
|
|
|
|
|
|
def test_read_file_returns_limited_text(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
(workspace / "notes.txt").write_text("one\ntwo\nthree\n", encoding="utf-8")
|
|
|
|
result = _run_tool(ReadFileTool(), {"path": "notes.txt", "start_line": 2, "max_lines": 1}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is True
|
|
assert payload["success"] is True
|
|
assert payload["content"] == "two"
|
|
assert payload["start_line"] == 2
|
|
assert payload["end_line"] == 2
|
|
assert payload["truncated"] is True
|
|
|
|
|
|
def test_search_files_finds_paths_and_content(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
(workspace / "Dockerfile").write_text("FROM python:3.12\n", encoding="utf-8")
|
|
(workspace / "src").mkdir()
|
|
(workspace / "src" / "app.py").write_text("print('docker log')\n", encoding="utf-8")
|
|
|
|
result = _run_tool(SearchFilesTool(), {"query": "docker", "max_results": 10}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is True
|
|
assert payload["success"] is True
|
|
assert ("Dockerfile", "path") in {
|
|
(item["path"], item["match_type"]) for item in payload["results"]
|
|
}
|
|
assert ("src/app.py", "content") in {
|
|
(item["path"], item["match_type"]) for item in payload["results"]
|
|
}
|
|
|
|
|
|
def test_read_file_rejects_relative_path_escape(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
(tmp_path / "secret.txt").write_text("secret\n", encoding="utf-8")
|
|
|
|
result = _run_tool(ReadFileTool(), {"path": "../secret.txt"}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is False
|
|
assert payload["success"] is False
|
|
assert "escapes workspace" in payload["error"]
|
|
|
|
|
|
def test_read_file_rejects_absolute_path_escape(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
outside = tmp_path / "outside.txt"
|
|
outside.write_text("secret\n", encoding="utf-8")
|
|
|
|
result = _run_tool(ReadFileTool(), {"path": str(outside)}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is False
|
|
assert payload["success"] is False
|
|
assert "escapes workspace" in payload["error"]
|
|
|
|
|
|
def test_read_file_rejects_symlink_escape(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
outside = tmp_path / "outside.txt"
|
|
outside.write_text("secret\n", encoding="utf-8")
|
|
link = workspace / "outside-link.txt"
|
|
try:
|
|
os.symlink(outside, link)
|
|
except (OSError, NotImplementedError):
|
|
return
|
|
|
|
result = _run_tool(ReadFileTool(), {"path": "outside-link.txt"}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is False
|
|
assert payload["success"] is False
|
|
assert "escapes workspace" in payload["error"]
|
|
|
|
|
|
def test_read_file_rejects_binary_files(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
(workspace / "blob.bin").write_bytes(b"abc\x00def")
|
|
|
|
result = _run_tool(ReadFileTool(), {"path": "blob.bin"}, workspace)
|
|
payload = _payload(result)
|
|
|
|
assert result.success is False
|
|
assert payload["success"] is False
|
|
assert "binary" in payload["error"]
|
|
|
|
|
|
def test_workspace_tools_reject_user_file_virtual_paths(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
|
|
read = _run_tool(ReadFileTool(), {"path": "uploads/get_helm.sh"}, workspace)
|
|
listed = _run_tool(ListDirectoryTool(), {"path": "outputs"}, workspace)
|
|
written = _run_tool(WriteFileTool(), {"path": "shared/profile.json", "content": "{}"}, workspace)
|
|
patched = _run_tool(
|
|
PatchFileTool(),
|
|
{"path": "tasks/task-123/draft.md", "old_text": "a", "new_text": "b"},
|
|
workspace,
|
|
)
|
|
|
|
for result in (read, listed, written, patched):
|
|
payload = _payload(result)
|
|
assert result.success is False
|
|
assert payload["success"] is False
|
|
assert "personal agent file system path" in payload["error"]
|
|
assert "user_files_read" in payload["error"]
|