import asyncio from memory_system_api.schemas import MessageIngestRequest, SearchRequest, SessionContextRequest from memory_system_api.service import MemorySystemService class FakeOpenViking: def __init__(self, fail_on_append: bool = False): self.fail_on_append = fail_on_append self.calls = [] async def create_user(self, user_id: str) -> dict: self.calls.append(("create_user", user_id)) return {"account_id": f"{user_id}_account", "admin_user_id": user_id, "user_key": f"{user_id}-key"} def credential_for_user( self, user_id: str, user_key: str, agent_id: str | None = None, ) -> str: self.calls.append(("credential_for_user", user_id, user_key, agent_id)) if user_key != f"{user_id}-key": raise PermissionError("Invalid user key") return f"key-{user_id}" async def ensure_session(self, user_key: str, session_id: str) -> dict: self.calls.append(("ensure_session", user_key, session_id)) return {"session_id": session_id} async def append_message(self, user_key: str, session_id: str, role: str, content: str) -> dict: self.calls.append(("append_message", user_key, session_id, role, content)) if self.fail_on_append: raise RuntimeError("openviking append failed") return {"message_count": len([call for call in self.calls if call[0] == "append_message"])} async def find(self, user_key: str, query: str, limit: int) -> dict: self.calls.append(("find", user_key, query, limit)) await asyncio.sleep(0.01) return {"items": [{"source": "openviking-find"}]} async def search( self, user_key: str, query: str, limit: int, level: int = 2, score_threshold: float = 0.8, target_uri: str = "viking://user/memories", ) -> dict: self.calls.append(("search", user_key, query, limit, level, score_threshold, target_uri)) await asyncio.sleep(0.01) return {"items": [{"source": "openviking-search"}]} async def search_profile_memories(self, user_key: str, query: str, limit: int, level: int) -> dict: self.calls.append(("search_profile_memories", user_key, query, limit, level)) return { "status": "ok", "result": { "memories": [ { "context_type": "memory", "uri": "viking://user/tom/memories/preferences/coffee.md", "level": 2, "score": 0.91, "abstract": "用户喜欢喝咖啡。", } ], "resources": [], "skills": [], "total": 1, }, } async def get_session_context(self, user_key: str, session_id: str) -> dict: self.calls.append(("get_session_context", user_key, session_id)) return { "status": "ok", "result": { "latest_archive_overview": "# Working Memory\nUser likes coffee.", "pre_archive_abstracts": [], "messages": [], "estimatedTokens": 42, "stats": {"totalArchives": 1}, }, } async def commit_session(self, user_key: str, session_id: str) -> dict: self.calls.append(("commit_session", user_key, session_id)) return {"status": "ok", "result": {"task_id": "task-1", "archive_uri": "archive-1"}} class FakeEverOS: def __init__(self, fail_on_append: bool = False): self.fail_on_append = fail_on_append self.calls = [] async def append_message(self, user_id: str, session_id: str, role: str, content: str) -> dict: self.calls.append(("append_message", user_id, session_id, role, content)) if self.fail_on_append: raise RuntimeError("everos append failed") return {"status": "accumulated"} async def search(self, user_id: str, session_id: str | None, query: str, method: str, limit: int) -> dict: self.calls.append(("search", user_id, session_id, query, method, limit)) await asyncio.sleep(0.01) return {"items": [{"source": f"everos-{method}"}]} async def flush(self, user_id: str, session_id: str) -> dict: self.calls.append(("flush", user_id, session_id)) return {"status": "flushed"} async def get_profile(self, user_id: str) -> dict: self.calls.append(("get_profile", user_id)) return { "data": { "episodes": [], "profiles": [ { "id": "profile-1", "user_id": user_id, "profile_data": {"summary": "喜欢咖啡"}, } ], "agent_cases": [], "agent_skills": [], "total_count": 1, "count": 1, } } class FakeEverOSWithVector(FakeEverOS): async def search(self, user_id: str, session_id: str | None, query: str, method: str, limit: int) -> dict: self.calls.append(("search", user_id, session_id, query, method, limit)) return { "data": { "episodes": [{"id": "episode-1", "vector": [0.1, 0.2]}], "original_data": { "episodes": { "episode-1": { "summary": "喜欢拿铁", "vector": [0.1, 0.2], "nested": {"vector": [0.3]}, } } }, } } class FakeEverOSVerbose(FakeEverOS): async def search(self, user_id: str, session_id: str | None, query: str, method: str, limit: int) -> dict: self.calls.append(("search", user_id, session_id, query, method, limit)) return { "data": { "episodes": [ { "id": "episode-1", "user_id": user_id, "session_id": session_id, "timestamp": "2026-05-22T07:50:51.750000Z", "summary": "userB 在对话中表示自己喜欢拿铁。", "subject": "UserB 表达对拿铁的喜好", "episode": "userB 在对话中表示自己喜欢拿铁。", "type": "Conversation", "parent_id": "parent-1", "score": 0.72, "atomic_facts": [], } ], "profiles": [], "raw_messages": [], "query": { "text": query, "method": method, "filters_applied": {"user_id": user_id, "session_id": session_id}, }, "original_data": { "episodes": { "episode-1": { "id": "episode-1", "summary": "userB 在对话中表示自己喜欢拿铁。", "episode": "userB 在对话中表示自己喜欢拿铁。", "vector_model": "Qwen3-VL-Embedding-2B", } } }, } } def test_capture_includes_exception_type_when_message_is_empty(): service = MemorySystemService(openviking=FakeOpenViking(), everos=FakeEverOS()) class EmptyError(Exception): pass async def fail(): raise EmptyError() response = asyncio.run(service._capture(fail)) assert response.status == "failed" assert response.error == "EmptyError" def test_create_user_delegates_to_openviking_only(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.create_user("alice")) assert response.status == "success" assert response.account == {"account_id": "alice_account", "admin_user_id": "alice", "user_key": "alice-key"} assert openviking.calls == [("create_user", "alice")] assert everos.calls == [] def test_search_removes_vectors_from_items_and_backend_results(): service = MemorySystemService(openviking=FakeOpenViking(), everos=FakeEverOSWithVector()) response = asyncio.run(service.search( SearchRequest(user_id="tom", user_key="tom-key", session_id="sess-1", query="咖啡偏好", use_llm=False, limit=5), )) assert response.items == [ {"source_backend": "openviking", "source": "openviking-search"}, {"source_backend": "everos", "memory_type": "episode", "id": "episode-1"}, ] assert not _has_key(response.backends["everos"].result, "vector") def test_search_returns_compact_items_and_backend_diagnostics_without_duplicate_raw_payloads(): service = MemorySystemService(openviking=FakeOpenViking(), everos=FakeEverOSVerbose()) response = asyncio.run(service.search( SearchRequest(user_id="tom", user_key="tom-key", session_id="sess-1", query="我喜欢喝什么?", use_llm=True), )) assert response.items == [ {"source_backend": "openviking", "source": "openviking-search"}, { "source_backend": "everos", "memory_type": "episode", "id": "episode-1", "user_id": "tom", "session_id": "sess-1", "timestamp": "2026-05-22T07:50:51.750000Z", "summary": "userB 在对话中表示自己喜欢拿铁。", "score": 0.72, }, ] assert response.backends["everos"].result == { "counts": {"episodes": 1, "profiles": 0, "raw_messages": 0}, "query": { "text": "我喜欢喝什么?", "method": "agentic", "filters_applied": {"user_id": "tom", "session_id": "sess-1"}, }, } assert not _has_key(response.backends["everos"].result, "original_data") def test_session_context_combines_openviking_context_and_everos_search_items(): openviking = FakeOpenViking() everos = FakeEverOSVerbose() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run( service.get_session_context( "sess-1", SessionContextRequest(user_id="tom", user_key="tom-key", query="我喜欢喝什么?", limit=5), ) ) assert response.status == "success" assert response.context == { "latest_archive_overview": "# Working Memory\nUser likes coffee.", "pre_archive_abstracts": [], "messages": [], "estimatedTokens": 42, "stats": {"totalArchives": 1}, } assert response.items == [ { "source_backend": "everos", "memory_type": "episode", "id": "episode-1", "user_id": "tom", "session_id": "sess-1", "timestamp": "2026-05-22T07:50:51.750000Z", "summary": "userB 在对话中表示自己喜欢拿铁。", "score": 0.72, } ] assert ("credential_for_user", "tom", "tom-key", "sess-1") in openviking.calls assert ("get_session_context", "key-tom", "sess-1") in openviking.calls assert ("search", "tom", "sess-1", "我喜欢喝什么?", "hybrid", 5) in everos.calls def test_profile_combines_everos_profile_and_openviking_memory_search(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.get_profile("tom", "tom-key", query="我想喝东西", limit=10, level=2)) assert response.status == "success" assert response.profile == { "data": { "episodes": [], "profiles": [ { "id": "profile-1", "user_id": "tom", "profile_data": {"summary": "喜欢咖啡"}, } ], "agent_cases": [], "agent_skills": [], "total_count": 1, "count": 1, } } assert response.items == [ { "source_backend": "openviking", "context_type": "memory", "uri": "viking://user/tom/memories/preferences/coffee.md", "level": 2, "score": 0.91, "abstract": "用户喜欢喝咖啡。", } ] assert response.backends["everos"].result == { "total_count": 1, "count": 1, "counts": { "episodes": 0, "profiles": 1, "agent_cases": 0, "agent_skills": 0, }, } assert "profiles" not in response.backends["everos"].result assert ("credential_for_user", "tom", "tom-key", None) in openviking.calls assert ("search_profile_memories", "key-tom", "我想喝东西", 10, 2) in openviking.calls assert everos.calls == [("get_profile", "tom")] def _has_key(value, key: str) -> bool: if isinstance(value, dict): return key in value or any(_has_key(item, key) for item in value.values()) if isinstance(value, list): return any(_has_key(item, key) for item in value) return False def test_ingest_splits_user_and_assistant_messages(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.ingest_messages( MessageIngestRequest( user_id="tom", user_key="tom-key", session_id="sess-1", user_message="我喜欢拿铁", assistant_message="我记住了", ) )) assert response.status == "success" assert response.message_count == 2 assert openviking.calls == [ ("credential_for_user", "tom", "tom-key", "sess-1"), ("ensure_session", "key-tom", "sess-1"), ("append_message", "key-tom", "sess-1", "user", "我喜欢拿铁"), ("append_message", "key-tom", "sess-1", "assistant", "我记住了"), ] assert everos.calls == [ ("append_message", "tom", "sess-1", "user", "我喜欢拿铁"), ("append_message", "tom", "sess-1", "assistant", "我记住了"), ] def test_ingest_requires_at_least_one_message(): service = MemorySystemService(openviking=FakeOpenViking(), everos=FakeEverOS()) try: asyncio.run( service.ingest_messages( MessageIngestRequest(user_id="tom", user_key="tom-key", session_id="sess-1"), ) ) except ValueError as exc: assert "at least one message" in str(exc) else: raise AssertionError("expected ValueError") def test_ingest_returns_partial_success_when_one_backend_fails(): service = MemorySystemService(openviking=FakeOpenViking(fail_on_append=True), everos=FakeEverOS()) response = asyncio.run(service.ingest_messages( MessageIngestRequest(user_id="tom", user_key="tom-key", session_id="sess-1", user_message="hello"), )) assert response.status == "partial_success" assert response.backends["openviking"].status == "failed" assert response.backends["everos"].status == "success" def test_commit_uses_user_key_without_account_id(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.commit_session("tom", "tom-key", "sess-1")) assert response.status == "success" assert openviking.calls == [ ("credential_for_user", "tom", "tom-key", "sess-1"), ("commit_session", "key-tom", "sess-1"), ] assert everos.calls == [("flush", "tom", "sess-1")] def test_search_uses_openviking_search_and_hybrid_without_llm(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.search( SearchRequest( user_id="tom", user_key="tom-key", session_id="sess-1", query="咖啡偏好", use_llm=False, limit=5, level=3, score_threshold=0.7, target_uri="viking://user/custom/memories", ), )) assert response.status == "success" assert response.items == [ {"source_backend": "openviking", "source": "openviking-search"}, {"source_backend": "everos", "source": "everos-hybrid"}, ] assert ("credential_for_user", "tom", "tom-key", "sess-1") in openviking.calls assert ("search", "key-tom", "咖啡偏好", 5, 3, 0.7, "viking://user/custom/memories") in openviking.calls assert ("search", "tom", "sess-1", "咖啡偏好", "hybrid", 5) in everos.calls def test_search_uses_search_and_agentic_with_llm(): openviking = FakeOpenViking() everos = FakeEverOS() service = MemorySystemService(openviking=openviking, everos=everos) response = asyncio.run(service.search( SearchRequest(user_id="tom", user_key="tom-key", session_id="sess-1", query="咖啡偏好", use_llm=True, limit=5), )) assert response.status == "success" assert response.items == [ {"source_backend": "openviking", "source": "openviking-search"}, {"source_backend": "everos", "source": "everos-agentic"}, ] assert ("credential_for_user", "tom", "tom-key", "sess-1") in openviking.calls assert ("search", "key-tom", "咖啡偏好", 5, 2, 0.8, "viking://user/memories") in openviking.calls assert ("search", "tom", "sess-1", "咖啡偏好", "agentic", 5) in everos.calls