import asyncio from memory_system_api.schemas import MessageIngestRequest, SearchRequest from memory_system_api.service import MemorySystemService class FakeOpenViking: def __init__(self, fail_on_append: bool = False): self.fail_on_append = fail_on_append self.calls = [] async def ensure_user(self, user_id: str) -> str: self.calls.append(("ensure_user", user_id)) return f"key-{user_id}" async def ensure_session(self, user_key: str, session_id: str) -> dict: self.calls.append(("ensure_session", user_key, session_id)) return {"session_id": session_id} async def append_message(self, user_key: str, session_id: str, role: str, content: str) -> dict: self.calls.append(("append_message", user_key, session_id, role, content)) if self.fail_on_append: raise RuntimeError("openviking append failed") return {"message_count": len([call for call in self.calls if call[0] == "append_message"])} async def find(self, user_key: str, user_id: str, query: str, limit: int) -> dict: self.calls.append(("find", user_key, user_id, query, limit)) await asyncio.sleep(0.01) return {"items": [{"source": "openviking-find"}]} async def search(self, user_key: str, session_id: str | None, query: str, limit: int) -> dict: self.calls.append(("search", user_key, session_id, query, limit)) await asyncio.sleep(0.01) return {"items": [{"source": "openviking-search"}]} class FakeEverOS: def __init__(self, fail_on_append: bool = False): self.fail_on_append = fail_on_append self.calls = [] async def append_message(self, user_id: str, session_id: str, role: str, content: str) -> dict: self.calls.append(("append_message", user_id, session_id, role, content)) if self.fail_on_append: raise RuntimeError("everos append failed") return {"status": "accumulated"} async def search(self, user_id: str, session_id: str | None, query: str, method: str, limit: int) -> dict: self.calls.append(("search", user_id, session_id, query, method, limit)) await asyncio.sleep(0.01) return {"items": [{"source": f"everos-{method}"}]} def test_ingest_splits_user_and_assistant_messages(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.ingest_messages( MessageIngestRequest( user_id="tom", session_id="sess-1", user_message="我喜欢拿铁", assistant_message="我记住了", ) )) assert response.status == "success" assert response.message_count == 2 assert openviking.calls == [ ("ensure_user", "tom"), ("ensure_session", "key-tom", "sess-1"), ("append_message", "key-tom", "sess-1", "user", "我喜欢拿铁"), ("append_message", "key-tom", "sess-1", "assistant", "我记住了"), ] assert everos.calls == [ ("append_message", "tom", "sess-1", "user", "我喜欢拿铁"), ("append_message", "tom", "sess-1", "assistant", "我记住了"), ] def test_ingest_requires_at_least_one_message(): service = MemorySystemService(openviking=FakeOpenViking(), everos=FakeEverOS()) try: asyncio.run(service.ingest_messages(MessageIngestRequest(user_id="tom", session_id="sess-1"))) except ValueError as exc: assert "at least one message" in str(exc) else: raise AssertionError("expected ValueError") def test_ingest_returns_partial_success_when_one_backend_fails(): service = MemorySystemService(openviking=FakeOpenViking(fail_on_append=True), everos=FakeEverOS()) response = asyncio.run(service.ingest_messages( MessageIngestRequest(user_id="tom", session_id="sess-1", user_message="hello") )) assert response.status == "partial_success" assert response.backends["openviking"].status == "failed" assert response.backends["everos"].status == "success" def test_search_uses_find_and_hybrid_without_llm(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.search( SearchRequest(user_id="tom", session_id="sess-1", query="咖啡偏好", use_llm=False, limit=5) )) assert response.status == "success" assert response.items == [ {"source_backend": "openviking", "source": "openviking-find"}, {"source_backend": "everos", "source": "everos-hybrid"}, ] assert ("find", "key-tom", "tom", "咖啡偏好", 5) in openviking.calls assert ("search", "tom", "sess-1", "咖啡偏好", "hybrid", 5) in everos.calls def test_search_uses_search_and_agentic_with_llm(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.search( SearchRequest(user_id="tom", session_id="sess-1", query="咖啡偏好", use_llm=True, limit=5) )) assert response.status == "success" assert response.items == [ {"source_backend": "openviking", "source": "openviking-search"}, {"source_backend": "everos", "source": "everos-agentic"}, ] assert ("search", "key-tom", "sess-1", "咖啡偏好", 5) in openviking.calls assert ("search", "tom", "sess-1", "咖啡偏好", "agentic", 5) in everos.calls