"""File storage and workspace browsing helpers for the web API.""" from __future__ import annotations import json import mimetypes import shutil import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import quote def content_disposition(disposition: str, filename: str) -> str: """Build a Content-Disposition header, including RFC 5987 for non-ASCII names.""" try: filename.encode("ascii") return f'{disposition}; filename="{filename}"' except UnicodeEncodeError: utf8_quoted = quote(filename) return f"{disposition}; filename*=UTF-8''{utf8_quoted}" def generate_file_id() -> str: """Generate a short unique file id.""" return uuid.uuid4().hex[:12] def save_file( workspace: Path, file_id: str, filename: str, content: bytes, content_type: str, session_id: str = "web:default", ) -> dict[str, Any]: """Save an uploaded attachment under workspace/files//.""" if not _is_safe_filename(filename): raise ValueError(f"Invalid filename: {filename}") file_dir = _files_dir(workspace) / file_id file_dir.mkdir(parents=True, exist_ok=True) file_path = file_dir / filename file_path.write_bytes(content) metadata = { "file_id": file_id, "name": filename, "content_type": content_type, "size": len(content), "created_at": datetime.now(timezone.utc).isoformat(), "session_id": session_id, } (file_dir / "metadata.json").write_text(json.dumps(metadata, ensure_ascii=False), encoding="utf-8") return metadata def get_file_metadata(workspace: Path, file_id: str) -> dict[str, Any] | None: """Load attachment metadata.""" if not _is_safe_file_id(file_id): return None meta_path = _files_dir(workspace) / file_id / "metadata.json" if not meta_path.exists(): return None try: data = json.loads(meta_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, ValueError): return None return data if isinstance(data, dict) else None def get_file_path(workspace: Path, file_id: str) -> Path | None: """Resolve the stored attachment path.""" meta = get_file_metadata(workspace, file_id) if meta is None: return None file_path = _files_dir(workspace) / file_id / str(meta.get("name") or "") try: file_path.resolve().relative_to(_files_dir(workspace).resolve()) except ValueError: return None return file_path if file_path.exists() and file_path.is_file() else None def list_files(workspace: Path, session_id: str | None = None) -> list[dict[str, Any]]: """List uploaded attachments, optionally filtered by session.""" files_dir = _files_dir(workspace) result: list[dict[str, Any]] = [] for entry in sorted(files_dir.iterdir()): if not entry.is_dir(): continue meta_path = entry / "metadata.json" if not meta_path.exists(): continue try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, ValueError): continue if not isinstance(meta, dict): continue if session_id and meta.get("session_id") != session_id: continue result.append(meta) return result def delete_file(workspace: Path, file_id: str) -> bool: """Delete a stored attachment by id.""" if not _is_safe_file_id(file_id): return False file_dir = _files_dir(workspace) / file_id if not file_dir.exists(): return False shutil.rmtree(file_dir) return True def browse_workspace(workspace: Path, rel_path: str = "") -> dict[str, Any]: """List files and directories below the workspace root.""" workspace = _ensure_workspace(workspace) target = _resolve_workspace_path(workspace, rel_path) if target is None or not target.is_dir(): raise ValueError("Invalid directory path") try: entries = sorted(target.iterdir(), key=lambda entry: (not entry.is_dir(), entry.name.lower())) except PermissionError as exc: raise ValueError("Permission denied") from exc items: list[dict[str, Any]] = [] for entry in entries: if entry.name.startswith("."): continue rel = str(entry.relative_to(workspace)) if entry.is_dir(): items.append( { "name": entry.name, "path": rel, "type": "directory", "size": None, "modified": datetime.fromtimestamp(entry.stat().st_mtime, tz=timezone.utc).isoformat(), } ) elif entry.is_file(): stat = entry.stat() content_type, _ = mimetypes.guess_type(entry.name) items.append( { "name": entry.name, "path": rel, "type": "file", "size": stat.st_size, "content_type": content_type or "application/octet-stream", "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), } ) return { "path": str(target.relative_to(workspace)) if target != workspace else "", "items": items, } def workspace_file_path(workspace: Path, rel_path: str) -> Path | None: """Resolve a workspace file path for download.""" workspace = _ensure_workspace(workspace) target = _resolve_workspace_path(workspace, rel_path) if target is None or not target.is_file(): return None return target def workspace_file_preview(workspace: Path, rel_path: str, *, max_bytes: int = 1024 * 1024) -> dict[str, Any]: """Return a bounded preview payload for a workspace file.""" file_path = workspace_file_path(workspace, rel_path) if file_path is None: raise ValueError("File not found") stat = file_path.stat() content_type, _ = mimetypes.guess_type(file_path.name) content_type = content_type or "application/octet-stream" raw = file_path.read_bytes() if stat.st_size <= max_bytes else file_path.read_bytes()[:max_bytes] is_binary = _is_probably_binary(raw, content_type) content = None if is_binary else raw.decode("utf-8", errors="replace") return { "name": file_path.name, "path": str(file_path.relative_to(_ensure_workspace(workspace))), "size": stat.st_size, "content_type": content_type, "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), "is_binary": is_binary, "is_truncated": stat.st_size > max_bytes, "content": content, } def save_to_workspace(workspace: Path, rel_dir: str, filename: str, content: bytes) -> dict[str, Any]: """Save an uploaded file to a workspace directory.""" if not filename: raise ValueError("Invalid filename") workspace = _ensure_workspace(workspace) target_dir = _resolve_workspace_path(workspace, rel_dir) if target_dir is None: raise ValueError("Invalid directory path") target_dir.mkdir(parents=True, exist_ok=True) file_path = (target_dir / filename).resolve() try: file_path.relative_to(workspace) except ValueError as exc: raise ValueError("Invalid filename") from exc file_path.write_bytes(content) stat = file_path.stat() content_type, _ = mimetypes.guess_type(filename) return { "name": filename, "path": str(file_path.relative_to(workspace)), "type": "file", "size": stat.st_size, "content_type": content_type or "application/octet-stream", "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), } def delete_workspace_path(workspace: Path, rel_path: str) -> bool: """Delete a file or directory below workspace root.""" workspace = _ensure_workspace(workspace) target = _resolve_workspace_path(workspace, rel_path) if target is None or not target.exists() or target == workspace: return False if target.is_dir(): shutil.rmtree(target) else: target.unlink() return True def create_workspace_dir(workspace: Path, rel_path: str) -> dict[str, Any]: """Create a directory below workspace root.""" workspace = _ensure_workspace(workspace) target = _resolve_workspace_path(workspace, rel_path) if target is None or target == workspace: raise ValueError("Invalid directory path") target.mkdir(parents=True, exist_ok=True) stat = target.stat() return { "name": target.name, "path": str(target.relative_to(workspace)), "type": "directory", "size": None, "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), } def _files_dir(workspace: Path) -> Path: directory = _ensure_workspace(workspace) / "files" directory.mkdir(parents=True, exist_ok=True) return directory def _ensure_workspace(workspace: Path) -> Path: root = Path(workspace).expanduser() root.mkdir(parents=True, exist_ok=True) return root.resolve() def _resolve_workspace_path(workspace: Path, rel_path: str) -> Path | None: root = _ensure_workspace(workspace) target = (root / rel_path).resolve() try: target.relative_to(root) except ValueError: return None return target def _is_probably_binary(raw: bytes, content_type: str) -> bool: if content_type.startswith("text/") or content_type in { "application/json", "application/javascript", "application/xml", "application/x-yaml", }: return False if not raw: return False if b"\x00" in raw[:4096]: return True try: raw[:4096].decode("utf-8") except UnicodeDecodeError: return True return False def _is_safe_filename(filename: str) -> bool: return bool(filename) and "/" not in filename and "\\" not in filename and not filename.startswith(".") def _is_safe_file_id(file_id: str) -> bool: return bool(file_id) and all(char in "0123456789abcdef" for char in file_id)