diff --git a/.env.example b/.env.example index 23495a6..de013cc 100644 --- a/.env.example +++ b/.env.example @@ -10,6 +10,17 @@ MEMORY_GATEWAY_STORAGE_DIR=./data/storage # Number of resource session IDs sent per EverOS search request. MEMORY_GATEWAY_RESOURCE_SEARCH_BATCH_SIZE=50 +# Max upload size in bytes. Default here is 25 MiB. +MEMORY_GATEWAY_MAX_UPLOAD_BYTES=26214400 + +# Comma-separated MIME allowlist. Prefix wildcards such as image/* are supported. +MEMORY_GATEWAY_ALLOWED_MIME_TYPES=image/*,audio/*,application/pdf,text/html,application/xhtml+xml,text/plain,text/markdown,text/csv,application/json,application/msword,application/vnd.openxmlformats-officedocument.wordprocessingml.document,application/vnd.ms-excel,application/vnd.openxmlformats-officedocument.spreadsheetml.sheet,application/vnd.ms-powerpoint,application/vnd.openxmlformats-officedocument.presentationml.presentation + +# EverOS add/flush retry policy during resource ingestion. +MEMORY_GATEWAY_EVEROS_INGEST_ATTEMPTS=3 +MEMORY_GATEWAY_EVEROS_RETRY_DELAY_SECONDS=0.25 +MEMORY_GATEWAY_EVEROS_TIMEOUT_SECONDS=120 + # API server settings used by python main.py. MEMORY_GATEWAY_HOST=0.0.0.0 MEMORY_GATEWAY_PORT=8010 diff --git a/README.md b/README.md index a4c0297..f6f8bf5 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,9 @@ Memory Gateway 2 是一个轻量级 FastAPI 服务,用于在 EverOS 现有 - 上传用户资源:文件、图片、音频、PDF、HTML、普通文档、纯文本。 - 保存资源元数据到 SQLite。 - 为每个资源生成独立 EverOS `session_id`。 -- 调用 EverOS `add` 和 `flush` 完成资源记忆摄入。 +- 调用 EverOS `add` 和 `flush` 完成资源记忆摄入,并对临时失败做轻量重试。 - 提供资源列表、详情、软删除。 +- 支持上传大小限制、MIME 白名单、同用户同 app/project 下按 sha256 幂等复用资源。 - 编排记忆搜索,支持当前聊天、资源记忆、全部用户记忆。 - 支持记忆 tombstone 软删除。 - 支持记忆手动 override。 @@ -53,6 +54,11 @@ cp .env.example .env | `MEMORY_GATEWAY_DB_PATH` | `./data/memory_gateway.sqlite3` | Gateway 自己的 SQLite 数据库路径 | | `MEMORY_GATEWAY_STORAGE_DIR` | `./data/storage` | 用户上传原始文件保存路径 | | `MEMORY_GATEWAY_RESOURCE_SEARCH_BATCH_SIZE` | `50` | resources scope 搜索时每批 session_id 数量 | +| `MEMORY_GATEWAY_MAX_UPLOAD_BYTES` | `26214400` | 单个上传文件最大字节数,默认 25MB | +| `MEMORY_GATEWAY_ALLOWED_MIME_TYPES` | 常见图片、音频、PDF、HTML、文本和 Office 文档 | 逗号分隔的上传 MIME 白名单,支持 `image/*` 这类前缀匹配 | +| `MEMORY_GATEWAY_EVEROS_INGEST_ATTEMPTS` | `3` | EverOS `add` 和 `flush` 各自最多重试次数 | +| `MEMORY_GATEWAY_EVEROS_RETRY_DELAY_SECONDS` | `0.25` | EverOS 摄入重试间隔秒数 | +| `MEMORY_GATEWAY_EVEROS_TIMEOUT_SECONDS` | `120` | 单次 EverOS HTTP 请求超时秒数 | | `MEMORY_GATEWAY_HOST` | `127.0.0.1` | Gateway API 监听地址 | | `MEMORY_GATEWAY_PORT` | `8010` | Gateway API 监听端口 | | `MEMORY_GATEWAY_RELOAD` | `false` | 是否启用 uvicorn reload,开发时可设为 `true` | @@ -223,6 +229,13 @@ Content-Type: multipart/form-data 8. 调用 EverOS `/api/v1/memory/flush`。 9. 成功后状态改为 `extracted`,失败后状态改为 `failed`。 +上传策略: + +- 文件会按流式方式写入磁盘,超过 `MEMORY_GATEWAY_MAX_UPLOAD_BYTES` 会返回 `413`,不会写入资源记录。 +- MIME 类型不在 `MEMORY_GATEWAY_ALLOWED_MIME_TYPES` 白名单内会返回 `415`。 +- 同一用户在同一 `app_id`、`project_id` 下重复上传相同 sha256 的活跃资源,会直接返回已有资源,避免重复调用 EverOS 摄入。 +- EverOS `add` 和 `flush` 临时失败时会分别按配置重试;单次请求受 `MEMORY_GATEWAY_EVEROS_TIMEOUT_SECONDS` 控制;全部失败后资源状态为 `failed`,并记录 `error_message`。 + content type 映射: | 文件类型 | EverOS content type | @@ -381,6 +394,7 @@ DELETE /resources/{resource_id}?user_id={user_id}&user_key={user_key} - 设置 `deleted_at = now()`。 - 设置 `status = deleted`。 - 后续 `resources` scope 搜索会排除该资源的 `session_id`。 +- 清理 Gateway 自己在 `MEMORY_GATEWAY_STORAGE_DIR` 下保存的原始上传文件。 - 不物理删除 EverOS 内部记忆或索引。 请求示例: @@ -490,10 +504,10 @@ Content-Type: application/json |---|---|---|---| | `user_id` | string | 是 | 用户 ID | | `user_key` | string | 是 | 用户 key | -| `session_id` | string | 否 | memory 所属 session | +| `session_id` | string | 是 | memory 所属 session,必须属于当前用户 | | `override_text` | string | 是 | 修正后的记忆文本 | -该接口只写入或更新 `memory_overrides`,不会修改 EverOS 原始文件。后续搜索结果命中该 `memory_id` 时,返回的 `text` 会替换为 `override_text`,但保留原始 memory id。 +该接口只写入或更新 `memory_overrides`,不会修改 EverOS 原始文件。写入前会校验 `session_id` 属于当前用户:当前版本支持当前用户的 `resource:{user_id}:{resource_id}` 和 `memory_edit:{user_id}`。后续搜索结果命中该 `memory_id` 时,返回的 `text` 会替换为 `override_text`,但保留原始 memory id。 请求示例: @@ -531,10 +545,10 @@ Content-Type: application/json |---|---|---|---| | `user_id` | string | 是 | 用户 ID | | `user_key` | string | 是 | 用户 key | -| `session_id` | string | 否 | memory 所属 session | +| `session_id` | string | 是 | memory 所属 session,必须属于当前用户 | | `reason` | string | 否 | 删除原因 | -该接口只写入 `memory_tombstones`,不会修改 EverOS 原始文件。后续搜索结果如果命中 tombstone 的 `memory_id` 或 `session_id`,会被过滤。 +该接口只写入 `memory_tombstones`,不会修改 EverOS 原始文件。写入前会校验 `session_id` 属于当前用户:当前版本支持当前用户的 `resource:{user_id}:{resource_id}` 和 `memory_edit:{user_id}`。后续搜索结果如果命中 tombstone 的 `memory_id` 或 `session_id`,会被过滤。 请求示例: diff --git a/core/api.py b/core/api.py index 49358eb..6b94530 100644 --- a/core/api.py +++ b/core/api.py @@ -9,7 +9,7 @@ from .config import GatewayConfig from .db import init_db from .everos_client import EverOSClient from .repository import MemoryRepository -from .service import MemoryGatewayService +from .service import MemoryGatewayService, UnsupportedContentType, UploadTooLarge class SearchMemoriesRequest(BaseModel): @@ -28,14 +28,14 @@ class SearchMemoriesRequest(BaseModel): class MemoryOverrideRequest(BaseModel): user_id: str = Field(min_length=1) user_key: str = Field(min_length=1) - session_id: str | None = None + session_id: str = Field(min_length=1) override_text: str = Field(min_length=1) class MemoryDeleteRequest(BaseModel): user_id: str = Field(min_length=1) user_key: str = Field(min_length=1) - session_id: str | None = None + session_id: str = Field(min_length=1) reason: str | None = None @@ -51,7 +51,10 @@ def create_app( cfg = config or GatewayConfig.from_env() init_db(cfg.database_path) repository = MemoryRepository(cfg.database_path) - client = everos_client or EverOSClient(cfg.everos_base_url) + client = everos_client or EverOSClient( + cfg.everos_base_url, + timeout=cfg.everos_timeout_seconds, + ) service = MemoryGatewayService(cfg, repository, client) app = FastAPI(title="memory-gateway2", version="0.1.0") @@ -105,14 +108,19 @@ def create_app( file: UploadFile = File(...), ) -> dict[str, Any]: require_user(user_id, user_key) - return await service.upload_resource( - user_id=user_id, - app_id=app_id, - project_id=project_id, - file=file, - title=title, - description=description, - ) + try: + return await service.upload_resource( + user_id=user_id, + app_id=app_id, + project_id=project_id, + file=file, + title=title, + description=description, + ) + except UploadTooLarge as exc: + raise HTTPException(status_code=413, detail=str(exc)) from exc + except UnsupportedContentType as exc: + raise HTTPException(status_code=415, detail=str(exc)) from exc @router.get("/resources") async def list_resources( @@ -167,6 +175,10 @@ def create_app( request: MemoryOverrideRequest, ) -> dict[str, Any]: require_user(request.user_id, request.user_key) + try: + service.assert_memory_session_owned(request.user_id, request.session_id) + except PermissionError as exc: + raise HTTPException(status_code=403, detail=str(exc)) from exc return service.upsert_override( user_id=request.user_id, memory_id=memory_id, @@ -180,6 +192,10 @@ def create_app( request: MemoryDeleteRequest, ) -> dict[str, Any]: require_user(request.user_id, request.user_key) + try: + service.assert_memory_session_owned(request.user_id, request.session_id) + except PermissionError as exc: + raise HTTPException(status_code=403, detail=str(exc)) from exc return service.delete_memory( user_id=request.user_id, memory_id=memory_id, diff --git a/core/config.py b/core/config.py index be96404..03cc8bf 100644 --- a/core/config.py +++ b/core/config.py @@ -6,6 +6,23 @@ from pathlib import Path _PROJECT_ROOT = Path(__file__).resolve().parents[1] +_DEFAULT_ALLOWED_MIME_TYPES = ( + "image/*", + "audio/*", + "application/pdf", + "text/html", + "application/xhtml+xml", + "text/plain", + "text/markdown", + "text/csv", + "application/json", + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.ms-powerpoint", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", +) @dataclass(frozen=True) @@ -14,9 +31,22 @@ class GatewayConfig: database_path: Path = _PROJECT_ROOT / "data" / "memory_gateway.sqlite3" storage_dir: Path = _PROJECT_ROOT / "data" / "storage" resource_search_batch_size: int = 50 + max_upload_bytes: int = 25 * 1024 * 1024 + allowed_mime_types: tuple[str, ...] = _DEFAULT_ALLOWED_MIME_TYPES + everos_ingest_attempts: int = 3 + everos_retry_delay_seconds: float = 0.25 + everos_timeout_seconds: float = 120.0 @classmethod def from_env(cls) -> GatewayConfig: + allowed_mime_types = tuple( + item.strip() + for item in os.environ.get( + "MEMORY_GATEWAY_ALLOWED_MIME_TYPES", + ",".join(_DEFAULT_ALLOWED_MIME_TYPES), + ).split(",") + if item.strip() + ) return cls( everos_base_url=os.environ.get( "EVEROS_BASE_URL", @@ -37,4 +67,17 @@ class GatewayConfig: resource_search_batch_size=int( os.environ.get("MEMORY_GATEWAY_RESOURCE_SEARCH_BATCH_SIZE", "50") ), + max_upload_bytes=int( + os.environ.get("MEMORY_GATEWAY_MAX_UPLOAD_BYTES", str(25 * 1024 * 1024)) + ), + allowed_mime_types=allowed_mime_types, + everos_ingest_attempts=int( + os.environ.get("MEMORY_GATEWAY_EVEROS_INGEST_ATTEMPTS", "3") + ), + everos_retry_delay_seconds=float( + os.environ.get("MEMORY_GATEWAY_EVEROS_RETRY_DELAY_SECONDS", "0.25") + ), + everos_timeout_seconds=float( + os.environ.get("MEMORY_GATEWAY_EVEROS_TIMEOUT_SECONDS", "120") + ), ) diff --git a/core/repository.py b/core/repository.py index 262011f..47b22a2 100644 --- a/core/repository.py +++ b/core/repository.py @@ -157,6 +157,31 @@ class MemoryRepository: ).fetchone() return _row_to_dict(row) + def find_active_resource_by_sha256( + self, + *, + user_id: str, + app_id: str, + project_id: str, + sha256: str, + ) -> dict[str, Any] | None: + with connect(self.db_path) as conn: + row = conn.execute( + """ + SELECT * FROM user_resources + WHERE user_id = ? + AND app_id = ? + AND project_id = ? + AND sha256 = ? + AND deleted_at IS NULL + AND status IN ('ingesting', 'extracted') + ORDER BY created_at ASC + LIMIT 1 + """, + (user_id, app_id, project_id, sha256), + ).fetchone() + return _row_to_dict(row) + def list_resources(self, user_id: str) -> list[dict[str, Any]]: with connect(self.db_path) as conn: rows = conn.execute( diff --git a/core/service.py b/core/service.py index a62b921..f37b8b6 100644 --- a/core/service.py +++ b/core/service.py @@ -1,11 +1,14 @@ from __future__ import annotations +import asyncio import hashlib import mimetypes import secrets +import shutil import uuid from pathlib import Path from typing import Any +from urllib.parse import unquote, urlparse from fastapi import UploadFile @@ -46,19 +49,71 @@ def _safe_filename(filename: str | None) -> str: return name or "upload.bin" -def _copy_upload(file: UploadFile, destination: Path) -> tuple[str, int]: +class UploadTooLarge(ValueError): + pass + + +class UnsupportedContentType(ValueError): + pass + + +def _copy_upload( + file: UploadFile, + destination: Path, + max_upload_bytes: int, + cleanup_root: Path, +) -> tuple[str, int]: sha256 = hashlib.sha256() size = 0 destination.parent.mkdir(parents=True, exist_ok=True) - with destination.open("wb") as out: - while True: - chunk = file.file.read(1024 * 1024) - if not chunk: - break - size += len(chunk) - sha256.update(chunk) - out.write(chunk) - return sha256.hexdigest(), size + try: + with destination.open("wb") as out: + while True: + chunk = file.file.read(1024 * 1024) + if not chunk: + break + size += len(chunk) + if size > max_upload_bytes: + raise UploadTooLarge( + f"upload exceeds max size of {max_upload_bytes} bytes" + ) + sha256.update(chunk) + out.write(chunk) + return sha256.hexdigest(), size + except Exception: + destination.unlink(missing_ok=True) + _remove_empty_parents(destination.parent, stop_at=cleanup_root) + raise + + +def _mime_allowed(mime_type: str | None, allowed_mime_types: tuple[str, ...]) -> bool: + mime = (mime_type or "").lower() + if not mime: + return False + for allowed in allowed_mime_types: + item = allowed.lower() + if item.endswith("/*") and mime.startswith(item[:-1]): + return True + if item == mime: + return True + return False + + +def _remove_empty_parents(path: Path, stop_at: Path | None = None) -> None: + current = path + stop = stop_at.resolve() if stop_at is not None else None + while True: + try: + resolved = current.resolve() + if stop is not None and resolved == stop: + return + current.rmdir() + except OSError: + return + parent = current.parent + if parent == current: + return + current = parent class MemoryGatewayService: @@ -101,9 +156,26 @@ class MemoryGatewayService: session_id = resource_session_id(user_id, resource_id) original_filename = _safe_filename(file.filename) mime_type = file.content_type or mimetypes.guess_type(original_filename)[0] + if not _mime_allowed(mime_type, self.config.allowed_mime_types): + raise UnsupportedContentType(f"unsupported content type: {mime_type}") content_type = infer_content_type(original_filename, mime_type) stored_path = self.config.storage_dir / user_id / resource_id / original_filename - sha256, size_bytes = _copy_upload(file, stored_path) + sha256, size_bytes = _copy_upload( + file, + stored_path, + self.config.max_upload_bytes, + self.config.storage_dir, + ) + existing = self.repository.find_active_resource_by_sha256( + user_id=user_id, + app_id=app_id, + project_id=project_id, + sha256=sha256, + ) + if existing is not None: + shutil.rmtree(stored_path.parent, ignore_errors=True) + return self._resource_summary(existing) + internal_uri = stored_path.resolve().as_uri() resource = self.repository.create_resource( @@ -126,16 +198,20 @@ class MemoryGatewayService: ) try: - await self.everos_client.add_memory( - self._build_add_payload( - resource=resource, - user_id=user_id, - app_id=app_id, - project_id=project_id, - filename=original_filename, + await self._retry_everos_call( + lambda: self.everos_client.add_memory( + self._build_add_payload( + resource=resource, + user_id=user_id, + app_id=app_id, + project_id=project_id, + filename=original_filename, + ) ) ) - await self.everos_client.flush_memory(session_id, app_id, project_id) + await self._retry_everos_call( + lambda: self.everos_client.flush_memory(session_id, app_id, project_id) + ) except Exception as exc: failed = self.repository.update_resource_status( resource_id, @@ -147,6 +223,23 @@ class MemoryGatewayService: extracted = self.repository.update_resource_status(resource_id, "extracted") return self._resource_summary(extracted or resource) + async def _retry_everos_call(self, operation: Any) -> Any: + attempts = max(1, self.config.everos_ingest_attempts) + last_error: Exception | None = None + for attempt in range(attempts): + try: + return await operation() + except Exception as exc: + last_error = exc + if attempt == attempts - 1: + break + delay = self.config.everos_retry_delay_seconds + if delay > 0: + await asyncio.sleep(delay) + if last_error is None: + raise RuntimeError("EverOS operation failed") + raise last_error + def _build_add_payload( self, *, @@ -199,8 +292,29 @@ class MemoryGatewayService: if before is None: return None resource = self.repository.soft_delete_resource(resource_id, user_id) + self._cleanup_resource_file(before) return self._resource_summary(resource) + def _cleanup_resource_file(self, resource: dict[str, Any]) -> None: + uri = str(resource.get("uri") or "") + if not uri.startswith("file://"): + return + parsed = urlparse(uri) + if parsed.scheme != "file": + return + try: + path = Path(unquote(parsed.path)).resolve() + storage_root = self.config.storage_dir.resolve() + except OSError: + return + if not path.is_relative_to(storage_root): + return + try: + path.unlink(missing_ok=True) + except OSError: + return + _remove_empty_parents(path.parent, stop_at=storage_root) + async def search_memories( self, *, @@ -394,6 +508,18 @@ class MemoryGatewayService: ) return {"memory_id": memory_id, "override_id": override["id"], "status": "active"} + def assert_memory_session_owned(self, user_id: str, session_id: str) -> None: + if session_id == f"memory_edit:{user_id}": + return + if session_id.startswith("resource:"): + resource = self.repository.get_resource_by_session_for_user( + session_id, + user_id, + ) + if resource is not None: + return + raise PermissionError("memory session does not belong to user") + def delete_memory( self, *, diff --git a/tests/test_gateway.py b/tests/test_gateway.py index 637b8ca..f142100 100644 --- a/tests/test_gateway.py +++ b/tests/test_gateway.py @@ -17,15 +17,22 @@ class FakeEverOSClient: self, search_results: list[dict[str, Any]] | None = None, health_error: Exception | None = None, + add_failures: int = 0, + flush_failures: int = 0, ) -> None: self.add_calls: list[dict[str, Any]] = [] self.flush_calls: list[dict[str, str]] = [] self.search_calls: list[dict[str, Any]] = [] self.search_results = search_results or [] self.health_error = health_error + self.add_failures = add_failures + self.flush_failures = flush_failures async def add_memory(self, payload: dict[str, Any]) -> dict[str, Any]: self.add_calls.append(payload) + if self.add_failures > 0: + self.add_failures -= 1 + raise RuntimeError("temporary add failure") return {"request_id": "add", "data": {"status": "accumulated"}} async def flush_memory( @@ -37,6 +44,9 @@ class FakeEverOSClient: self.flush_calls.append( {"session_id": session_id, "app_id": app_id, "project_id": project_id} ) + if self.flush_failures > 0: + self.flush_failures -= 1 + raise RuntimeError("temporary flush failure") return {"request_id": "flush", "data": {"status": "extracted"}} async def search_memory(self, payload: dict[str, Any]) -> dict[str, Any]: @@ -73,6 +83,33 @@ def app_client( return httpx.AsyncClient(transport=transport, base_url="http://test") +def create_test_resource( + repo: MemoryRepository, + *, + resource_id: str, + user_id: str, + uri: str = "file:///private/a.txt", +) -> None: + repo.create_resource( + id=resource_id, + user_id=user_id, + app_id="default", + project_id="default", + session_id=f"resource:{user_id}:{resource_id}", + original_filename="a.txt", + mime_type="text/plain", + content_type="text", + uri=uri, + uri_public=False, + sha256=f"sha-{resource_id}", + size_bytes=1, + title=None, + description=None, + status="extracted", + error_message=None, + ) + + async def create_user(client: httpx.AsyncClient, user_id: str = "u_123") -> str: response = await client.post("/users", json={"user_id": user_id}) assert response.status_code == 200, response.text @@ -82,6 +119,18 @@ async def create_user(client: httpx.AsyncClient, user_id: str = "u_123") -> str: return body["user_key"] +def test_create_app_uses_configured_everos_timeout(config: GatewayConfig) -> None: + config = GatewayConfig( + everos_base_url=config.everos_base_url, + database_path=config.database_path, + storage_dir=config.storage_dir, + everos_timeout_seconds=7.5, + ) + app = create_app(config=config) + + assert app.state.everos_client.timeout == 7.5 + + @pytest.mark.asyncio async def test_health_reports_api_and_everos_ok( config: GatewayConfig, @@ -179,6 +228,102 @@ async def test_upload_resource_creates_record_and_calls_everos( ] +@pytest.mark.asyncio +async def test_upload_retries_transient_everos_failure( + config: GatewayConfig, +) -> None: + config = GatewayConfig( + everos_base_url=config.everos_base_url, + database_path=config.database_path, + storage_dir=config.storage_dir, + everos_ingest_attempts=2, + everos_retry_delay_seconds=0, + ) + everos = FakeEverOSClient(add_failures=1, flush_failures=1) + async with app_client(config, everos) as client: + user_key = await create_user(client) + response = await client.post( + "/resources", + data={"user_id": "u_123", "user_key": user_key}, + files={"file": ("retry.txt", b"retry me", "text/plain")}, + ) + + assert response.status_code == 200, response.text + assert response.json()["status"] == "extracted" + assert len(everos.add_calls) == 2 + assert len(everos.flush_calls) == 2 + + +@pytest.mark.asyncio +async def test_upload_duplicate_resource_is_idempotent_for_same_user( + config: GatewayConfig, +) -> None: + everos = FakeEverOSClient() + async with app_client(config, everos) as client: + user_key = await create_user(client) + first = await client.post( + "/resources", + data={"user_id": "u_123", "user_key": user_key}, + files={"file": ("same.txt", b"same bytes", "text/plain")}, + ) + second = await client.post( + "/resources", + data={"user_id": "u_123", "user_key": user_key}, + files={"file": ("same.txt", b"same bytes", "text/plain")}, + ) + + assert first.status_code == 200, first.text + assert second.status_code == 200, second.text + assert second.json()["resource_id"] == first.json()["resource_id"] + assert len(everos.add_calls) == 1 + assert len(everos.flush_calls) == 1 + + +@pytest.mark.asyncio +async def test_upload_rejects_file_larger_than_configured_limit( + config: GatewayConfig, + repo: MemoryRepository, +) -> None: + config = GatewayConfig( + everos_base_url=config.everos_base_url, + database_path=config.database_path, + storage_dir=config.storage_dir, + max_upload_bytes=4, + ) + everos = FakeEverOSClient() + async with app_client(config, everos) as client: + user_key = await create_user(client) + response = await client.post( + "/resources", + data={"user_id": "u_123", "user_key": user_key}, + files={"file": ("big.txt", b"too large", "text/plain")}, + ) + + assert response.status_code == 413, response.text + assert repo.list_resources("u_123") == [] + assert not any(config.storage_dir.rglob("*")) + assert everos.add_calls == [] + + +@pytest.mark.asyncio +async def test_upload_rejects_unsupported_mime_type( + config: GatewayConfig, + repo: MemoryRepository, +) -> None: + everos = FakeEverOSClient() + async with app_client(config, everos) as client: + user_key = await create_user(client) + response = await client.post( + "/resources", + data={"user_id": "u_123", "user_key": user_key}, + files={"file": ("payload.bin", b"\x00\x01", "application/octet-stream")}, + ) + + assert response.status_code == 415, response.text + assert repo.list_resources("u_123") == [] + assert everos.add_calls == [] + + @pytest.mark.asyncio async def test_resource_detail_does_not_leak_internal_uri( config: GatewayConfig, @@ -298,6 +443,10 @@ async def test_deleted_resource_is_excluded_from_resource_scope_search( assert search_response.status_code == 200 assert everos.search_calls == [] assert search_response.json()["results"] == [] + repo = MemoryRepository(config.database_path) + deleted = repo.get_resource(resource_id) + assert deleted is not None + assert not Path(deleted["uri"].removeprefix("file://")).exists() @pytest.mark.asyncio @@ -305,24 +454,7 @@ async def test_tombstone_filters_search_results( config: GatewayConfig, repo: MemoryRepository, ) -> None: - repo.create_resource( - id="r_1", - user_id="u_123", - app_id="default", - project_id="default", - session_id="resource:u_123:r_1", - original_filename="a.txt", - mime_type="text/plain", - content_type="text", - uri="file:///private/a.txt", - uri_public=False, - sha256="abc", - size_bytes=1, - title=None, - description=None, - status="extracted", - error_message=None, - ) + create_test_resource(repo, resource_id="r_1", user_id="u_123") repo.add_tombstone( user_id="u_123", memory_id="mem_deleted", @@ -358,24 +490,7 @@ async def test_override_replaces_search_result_text( config: GatewayConfig, repo: MemoryRepository, ) -> None: - repo.create_resource( - id="r_1", - user_id="u_123", - app_id="default", - project_id="default", - session_id="resource:u_123:r_1", - original_filename="a.txt", - mime_type="text/plain", - content_type="text", - uri="file:///private/a.txt", - uri_public=False, - sha256="abc", - size_bytes=1, - title=None, - description=None, - status="extracted", - error_message=None, - ) + create_test_resource(repo, resource_id="r_1", user_id="u_123") everos = FakeEverOSClient( [{"id": "mem_1", "session_id": "resource:u_123:r_1", "episode": "old text"}] ) @@ -407,6 +522,52 @@ async def test_override_replaces_search_result_text( assert result["raw"]["episode"] == "old text" +@pytest.mark.asyncio +async def test_memory_override_rejects_session_owned_by_another_user( + config: GatewayConfig, + repo: MemoryRepository, +) -> None: + create_test_resource(repo, resource_id="r_bob", user_id="bob") + everos = FakeEverOSClient() + async with app_client(config, everos) as client: + user_key = await create_user(client, "alice") + response = await client.patch( + "/memories/mem_1", + json={ + "user_id": "alice", + "user_key": user_key, + "session_id": "resource:bob:r_bob", + "override_text": "nope", + }, + ) + + assert response.status_code == 403, response.text + assert repo.get_active_overrides("alice") == [] + + +@pytest.mark.asyncio +async def test_memory_delete_requires_owned_session( + config: GatewayConfig, + repo: MemoryRepository, +) -> None: + everos = FakeEverOSClient() + async with app_client(config, everos) as client: + user_key = await create_user(client, "u_123") + response = await client.request( + "DELETE", + "/memories/mem_1", + json={ + "user_id": "u_123", + "user_key": user_key, + "session_id": "resource:u_123:r_missing", + "reason": "manual delete", + }, + ) + + assert response.status_code == 403, response.text + assert repo.get_tombstones("u_123") == [] + + @pytest.mark.asyncio async def test_list_resources_returns_only_not_deleted( config: GatewayConfig, @@ -445,6 +606,7 @@ async def test_delete_memory_writes_tombstone( config: GatewayConfig, repo: MemoryRepository, ) -> None: + create_test_resource(repo, resource_id="r_1", user_id="u_123") everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client)