from __future__ import annotations from pathlib import Path from typing import Any import httpx import pytest from core.api import create_app from core.config import GatewayConfig from core.db import init_db from core.repository import MemoryRepository class FakeEverOSClient: def __init__( self, search_results: list[dict[str, Any]] | None = None, health_error: Exception | None = None, add_failures: int = 0, flush_failures: int = 0, ) -> None: self.add_calls: list[dict[str, Any]] = [] self.flush_calls: list[dict[str, str]] = [] self.search_calls: list[dict[str, Any]] = [] self.search_results = search_results or [] self.health_error = health_error self.add_failures = add_failures self.flush_failures = flush_failures async def add_memory(self, payload: dict[str, Any]) -> dict[str, Any]: self.add_calls.append(payload) if self.add_failures > 0: self.add_failures -= 1 raise RuntimeError("temporary add failure") return {"request_id": "add", "data": {"status": "accumulated"}} async def flush_memory( self, session_id: str, app_id: str, project_id: str, ) -> dict[str, Any]: self.flush_calls.append( {"session_id": session_id, "app_id": app_id, "project_id": project_id} ) if self.flush_failures > 0: self.flush_failures -= 1 raise RuntimeError("temporary flush failure") return {"request_id": "flush", "data": {"status": "extracted"}} async def search_memory(self, payload: dict[str, Any]) -> dict[str, Any]: self.search_calls.append(payload) return {"request_id": "search", "data": {"episodes": self.search_results}} async def health_check(self) -> dict[str, Any]: if self.health_error is not None: raise self.health_error return {"status": "ok"} @pytest.fixture def config(tmp_path: Path) -> GatewayConfig: return GatewayConfig( everos_base_url="http://everos.test", database_path=tmp_path / "gateway.sqlite3", storage_dir=tmp_path / "storage", ) @pytest.fixture def repo(config: GatewayConfig) -> MemoryRepository: init_db(config.database_path) return MemoryRepository(config.database_path) def app_client( config: GatewayConfig, everos_client: FakeEverOSClient, ) -> httpx.AsyncClient: app = create_app(config=config, everos_client=everos_client) transport = httpx.ASGITransport(app=app) return httpx.AsyncClient(transport=transport, base_url="http://test") def create_test_resource( repo: MemoryRepository, *, resource_id: str, user_id: str, uri: str = "file:///private/a.txt", ) -> None: repo.create_resource( id=resource_id, user_id=user_id, app_id="default", project_id="default", session_id=f"resource:{user_id}:{resource_id}", original_filename="a.txt", mime_type="text/plain", content_type="text", uri=uri, uri_public=False, sha256=f"sha-{resource_id}", size_bytes=1, title=None, description=None, status="extracted", error_message=None, ) async def create_user(client: httpx.AsyncClient, user_id: str = "u_123") -> str: response = await client.post("/users", json={"user_id": user_id}) assert response.status_code == 200, response.text body = response.json() assert body["user_id"] == user_id assert body["user_key"] return body["user_key"] def test_create_app_uses_configured_everos_timeout(config: GatewayConfig) -> None: config = GatewayConfig( everos_base_url=config.everos_base_url, database_path=config.database_path, storage_dir=config.storage_dir, everos_timeout_seconds=7.5, ) app = create_app(config=config) assert app.state.everos_client.timeout == 7.5 @pytest.mark.asyncio async def test_health_reports_api_and_everos_ok( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: response = await client.get("/health") assert response.status_code == 200, response.text assert response.json() == { "status": "ok", "api": {"status": "ok"}, "everos": { "status": "ok", "base_url": "http://everos.test", "data": {"status": "ok"}, }, } @pytest.mark.asyncio async def test_health_reports_degraded_when_everos_fails( config: GatewayConfig, ) -> None: everos = FakeEverOSClient(health_error=RuntimeError("everos down")) async with app_client(config, everos) as client: response = await client.get("/health") assert response.status_code == 200, response.text body = response.json() assert body["status"] == "degraded" assert body["api"] == {"status": "ok"} assert body["everos"]["status"] == "unavailable" assert body["everos"]["base_url"] == "http://everos.test" assert body["everos"]["error"] == "everos down" @pytest.mark.asyncio async def test_create_user_generates_and_persists_user_key( config: GatewayConfig, repo: MemoryRepository, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client, "u_123") user = repo.get_user("u_123") assert user is not None assert user["user_key"] == user_key @pytest.mark.asyncio async def test_upload_resource_creates_record_and_calls_everos( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) response = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key, "title": "Contract"}, files={"file": ("contract.txt", b"pay in 30 days", "text/plain")}, ) assert response.status_code == 200, response.text body = response.json() resource_id = body["resource_id"] assert body["session_id"] == f"resource:u_123:{resource_id}" assert body["uri"] == f"resource://u_123/{resource_id}" assert body["status"] == "extracted" repo = MemoryRepository(config.database_path) resource = repo.get_resource(resource_id) assert resource is not None assert resource["status"] == "extracted" assert resource["original_filename"] == "contract.txt" assert resource["content_type"] == "text" assert resource["sha256"] assert resource["size_bytes"] == len(b"pay in 30 days") assert not resource["uri"].startswith("resource://") assert len(everos.add_calls) == 1 add_payload = everos.add_calls[0] assert add_payload["session_id"] == f"resource:u_123:{resource_id}" content = add_payload["messages"][0]["content"][0] assert content["type"] == "text" assert content["uri"].startswith("file://") assert content["extras"] == {"resource_id": resource_id, "source": "user_upload"} assert everos.flush_calls == [ { "session_id": f"resource:u_123:{resource_id}", "app_id": "default", "project_id": "default", } ] @pytest.mark.asyncio async def test_upload_retries_transient_everos_failure( config: GatewayConfig, ) -> None: config = GatewayConfig( everos_base_url=config.everos_base_url, database_path=config.database_path, storage_dir=config.storage_dir, everos_ingest_attempts=2, everos_retry_delay_seconds=0, ) everos = FakeEverOSClient(add_failures=1, flush_failures=1) async with app_client(config, everos) as client: user_key = await create_user(client) response = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("retry.txt", b"retry me", "text/plain")}, ) assert response.status_code == 200, response.text assert response.json()["status"] == "extracted" assert len(everos.add_calls) == 2 assert len(everos.flush_calls) == 2 @pytest.mark.asyncio async def test_upload_duplicate_resource_is_idempotent_for_same_user( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) first = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("same.txt", b"same bytes", "text/plain")}, ) second = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("same.txt", b"same bytes", "text/plain")}, ) assert first.status_code == 200, first.text assert second.status_code == 200, second.text assert second.json()["resource_id"] == first.json()["resource_id"] assert len(everos.add_calls) == 1 assert len(everos.flush_calls) == 1 @pytest.mark.asyncio async def test_upload_rejects_file_larger_than_configured_limit( config: GatewayConfig, repo: MemoryRepository, ) -> None: config = GatewayConfig( everos_base_url=config.everos_base_url, database_path=config.database_path, storage_dir=config.storage_dir, max_upload_bytes=4, ) everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) response = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("big.txt", b"too large", "text/plain")}, ) assert response.status_code == 413, response.text assert repo.list_resources("u_123") == [] assert not any(config.storage_dir.rglob("*")) assert everos.add_calls == [] @pytest.mark.asyncio async def test_upload_rejects_unsupported_mime_type( config: GatewayConfig, repo: MemoryRepository, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) response = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("payload.bin", b"\x00\x01", "application/octet-stream")}, ) assert response.status_code == 415, response.text assert repo.list_resources("u_123") == [] assert everos.add_calls == [] @pytest.mark.asyncio async def test_resource_detail_does_not_leak_internal_uri( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) created = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("note.txt", b"hello", "text/plain")}, ) resource_id = created.json()["resource_id"] detail = await client.get( f"/resources/{resource_id}", params={"user_id": "u_123", "user_key": user_key}, ) assert detail.status_code == 200, detail.text resources = detail.json()["resources"] assert len(resources) == 1 assert resources[0]["uri"] == f"resource://u_123/{resource_id}" assert "file://" not in detail.text @pytest.mark.asyncio async def test_resource_detail_returns_empty_when_user_has_no_resource( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client, "u_empty") response = await client.get( "/resources/r_missing", params={"user_id": "u_empty", "user_key": user_key}, ) assert response.status_code == 200, response.text assert response.json() == {"resources": []} @pytest.mark.asyncio async def test_resources_are_isolated_by_user_key( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: alice_key = await create_user(client, "alice") bob_key = await create_user(client, "bob") created = await client.post( "/resources", data={"user_id": "alice", "user_key": alice_key}, files={"file": ("alice.txt", b"alice-only", "text/plain")}, ) resource_id = created.json()["resource_id"] bob_detail = await client.get( f"/resources/{resource_id}", params={"user_id": "bob", "user_key": bob_key}, ) bob_list = await client.get( "/resources", params={"user_id": "bob", "user_key": bob_key}, ) assert bob_detail.status_code == 200, bob_detail.text assert bob_detail.json() == {"resources": []} assert bob_list.status_code == 200, bob_list.text assert bob_list.json() == {"resources": []} @pytest.mark.asyncio async def test_resource_api_rejects_invalid_user_key( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: await create_user(client, "u_123") response = await client.get( "/resources", params={"user_id": "u_123", "user_key": "wrong"}, ) assert response.status_code == 401 @pytest.mark.asyncio async def test_deleted_resource_is_excluded_from_resource_scope_search( config: GatewayConfig, ) -> None: everos = FakeEverOSClient( [{"id": "mem_1", "session_id": "resource:u_123:r_live", "episode": "live"}] ) async with app_client(config, everos) as client: user_key = await create_user(client) created = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("note.txt", b"hello", "text/plain")}, ) resource_id = created.json()["resource_id"] delete_response = await client.delete( f"/resources/{resource_id}", params={"user_id": "u_123", "user_key": user_key}, ) search_response = await client.post( "/memories/search", json={ "user_id": "u_123", "user_key": user_key, "query": "hello", "scope": ["resources"], "top_k": 8, }, ) assert delete_response.status_code == 200 assert search_response.status_code == 200 assert everos.search_calls == [] assert search_response.json()["results"] == [] repo = MemoryRepository(config.database_path) deleted = repo.get_resource(resource_id) assert deleted is not None assert not Path(deleted["uri"].removeprefix("file://")).exists() @pytest.mark.asyncio async def test_tombstone_filters_search_results( config: GatewayConfig, repo: MemoryRepository, ) -> None: create_test_resource(repo, resource_id="r_1", user_id="u_123") repo.add_tombstone( user_id="u_123", memory_id="mem_deleted", session_id=None, reason="user deleted", ) everos = FakeEverOSClient( [ {"id": "mem_deleted", "session_id": "resource:u_123:r_1", "episode": "x"}, {"id": "mem_live", "session_id": "resource:u_123:r_1", "episode": "y"}, ] ) async with app_client(config, everos) as client: user_key = await create_user(client) response = await client.post( "/memories/search", json={ "user_id": "u_123", "user_key": user_key, "query": "hello", "scope": ["resources"], }, ) assert response.status_code == 200, response.text results = response.json()["results"] assert [result["id"] for result in results] == ["mem_live"] @pytest.mark.asyncio async def test_override_replaces_search_result_text( config: GatewayConfig, repo: MemoryRepository, ) -> None: create_test_resource(repo, resource_id="r_1", user_id="u_123") everos = FakeEverOSClient( [{"id": "mem_1", "session_id": "resource:u_123:r_1", "episode": "old text"}] ) async with app_client(config, everos) as client: user_key = await create_user(client) patch_response = await client.patch( "/memories/mem_1", json={ "user_id": "u_123", "user_key": user_key, "session_id": "resource:u_123:r_1", "override_text": "corrected text", }, ) search_response = await client.post( "/memories/search", json={ "user_id": "u_123", "user_key": user_key, "query": "hello", "scope": ["resources"], }, ) assert patch_response.status_code == 200, patch_response.text result = search_response.json()["results"][0] assert result["id"] == "mem_1" assert result["text"] == "corrected text" assert result["raw"]["episode"] == "old text" @pytest.mark.asyncio async def test_memory_override_rejects_session_owned_by_another_user( config: GatewayConfig, repo: MemoryRepository, ) -> None: create_test_resource(repo, resource_id="r_bob", user_id="bob") everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client, "alice") response = await client.patch( "/memories/mem_1", json={ "user_id": "alice", "user_key": user_key, "session_id": "resource:bob:r_bob", "override_text": "nope", }, ) assert response.status_code == 403, response.text assert repo.get_active_overrides("alice") == [] @pytest.mark.asyncio async def test_memory_delete_requires_owned_session( config: GatewayConfig, repo: MemoryRepository, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client, "u_123") response = await client.request( "DELETE", "/memories/mem_1", json={ "user_id": "u_123", "user_key": user_key, "session_id": "resource:u_123:r_missing", "reason": "manual delete", }, ) assert response.status_code == 403, response.text assert repo.get_tombstones("u_123") == [] @pytest.mark.asyncio async def test_list_resources_returns_only_not_deleted( config: GatewayConfig, ) -> None: everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) first = await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("a.txt", b"a", "text/plain")}, ) await client.post( "/resources", data={"user_id": "u_123", "user_key": user_key}, files={"file": ("b.txt", b"b", "text/plain")}, ) await client.delete( f"/resources/{first.json()['resource_id']}", params={"user_id": "u_123", "user_key": user_key}, ) response = await client.get( "/resources", params={"user_id": "u_123", "user_key": user_key}, ) assert response.status_code == 200 items = response.json()["resources"] assert len(items) == 1 assert items[0]["filename"] == "b.txt" assert items[0]["uri"].startswith("resource://u_123/") @pytest.mark.asyncio async def test_delete_memory_writes_tombstone( config: GatewayConfig, repo: MemoryRepository, ) -> None: create_test_resource(repo, resource_id="r_1", user_id="u_123") everos = FakeEverOSClient() async with app_client(config, everos) as client: user_key = await create_user(client) response = await client.request( "DELETE", "/memories/mem_1", json={ "user_id": "u_123", "user_key": user_key, "session_id": "resource:u_123:r_1", "reason": "manual delete", }, ) assert response.status_code == 200, response.text tombstones = repo.get_tombstones("u_123") assert len(tombstones) == 1 assert tombstones[0]["memory_id"] == "mem_1" assert tombstones[0]["session_id"] == "resource:u_123:r_1"