"""Orchestration for the lightweight Memory System API.""" from __future__ import annotations import asyncio from typing import Any, Awaitable, Callable from urllib.parse import urlparse from .clients import EverOSMemorySystemClient, OpenVikingMemorySystemClient from .schemas import ( AccountResponse, BackendStatus, CommitResponse, ExtractResponse, MemoryOperationResponse, MemoryWriteRequest, MessageIngestRequest, MessageIngestResponse, ProfileResponse, ResourceMutationResponse, ResourceUploadRequest, SearchRequest, SearchResponse, SessionContextRequest, SessionContextResponse, ) class MemorySystemService: def __init__(self, openviking: Any | None = None, everos: Any | None = None) -> None: self.openviking = openviking or OpenVikingMemorySystemClient() self.everos = everos or EverOSMemorySystemClient() async def create_user(self, user_id: str) -> AccountResponse: backends = {"openviking": await self._capture(lambda: self.openviking.create_user(user_id))} account = backends["openviking"].result if backends["openviking"].status == "success" else None return AccountResponse(status=self._aggregate_status(backends), account=account, backends=backends) async def upload_resource(self, request: ResourceUploadRequest) -> ResourceMutationResponse: credential = self.openviking.credential_for_user(request.user_id, request.user_key) async def upload_openviking() -> dict[str, Any]: if self._is_remote_url(request.path): return await self.openviking.add_resource( credential, path=request.path, to=request.to, reason=request.reason, wait=request.wait, directly_upload_media=request.directly_upload_media, ) temp_upload = await self.openviking.upload_temp_file(credential, request.path) temp_file_id = self._temp_file_id_from_result(temp_upload) return await self.openviking.add_resource( credential, temp_file_id=temp_file_id, to=request.to, reason=request.reason, wait=request.wait, directly_upload_media=request.directly_upload_media, ) backends = {"openviking": await self._capture(upload_openviking)} resource = backends["openviking"].result if backends["openviking"].status == "success" else None return ResourceMutationResponse(status=self._aggregate_status(backends), resource=resource, backends=backends) async def delete_resource( self, user_id: str, user_key: str, uri: str, recursive: bool = True, ) -> ResourceMutationResponse: credential = self.openviking.credential_for_user(user_id, user_key) backends = { "openviking": await self._capture(lambda: self.openviking.delete_resource(credential, uri, recursive)), } resource = backends["openviking"].result if backends["openviking"].status == "success" else None return ResourceMutationResponse(status=self._aggregate_status(backends), resource=resource, backends=backends) async def list_memories( self, user_id: str, user_key: str, uri: str = "viking://user/memories", recursive: bool = True, ) -> MemoryOperationResponse: credential = self.openviking.credential_for_user(user_id, user_key) backends = { "openviking": await self._capture(lambda: self.openviking.list_memories(credential, uri, recursive)), } memory = backends["openviking"].result if backends["openviking"].status == "success" else None return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends) async def read_memory( self, user_id: str, user_key: str, uri: str, ) -> MemoryOperationResponse: credential = self.openviking.credential_for_user(user_id, user_key) backends = { "openviking": await self._capture(lambda: self.openviking.read_memory(credential, uri)), } memory = backends["openviking"].result if backends["openviking"].status == "success" else None return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends) async def write_memory(self, request: MemoryWriteRequest) -> MemoryOperationResponse: credential = self.openviking.credential_for_user(request.user_id, request.user_key) backends = { "openviking": await self._capture( lambda: self.openviking.write_memory( credential, uri=request.uri, content=request.content, mode=request.mode, wait=request.wait, ) ), } memory = backends["openviking"].result if backends["openviking"].status == "success" else None return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends) async def delete_memory( self, user_id: str, user_key: str, uri: str, recursive: bool = False, ) -> MemoryOperationResponse: credential = self.openviking.credential_for_user(user_id, user_key) backends = { "openviking": await self._capture(lambda: self.openviking.delete_memory(credential, uri, recursive)), } memory = backends["openviking"].result if backends["openviking"].status == "success" else None return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends) async def ingest_messages(self, request: MessageIngestRequest) -> MessageIngestResponse: messages = self._messages_from_request(request) if not messages: raise ValueError("at least one message is required") credential = self.openviking.credential_for_user( request.user_id, request.user_key, agent_id=request.session_id, ) await self.openviking.ensure_session(credential, request.session_id) async def write_openviking() -> list[dict[str, Any]]: results = [] for message in messages: results.append( await self.openviking.append_message(credential, request.session_id, message["role"], message["content"]) ) return results async def write_everos() -> list[dict[str, Any]]: results = [] for message in messages: results.append( await self.everos.append_message(request.user_id, request.session_id, message["role"], message["content"]) ) return results backends = await self._run_backends(openviking=write_openviking, everos=write_everos) return MessageIngestResponse( status=self._aggregate_status(backends), message_count=len(messages), backends=backends, ) async def commit_session(self, user_id: str, user_key: str, session_id: str) -> CommitResponse: credential = self.openviking.credential_for_user(user_id, user_key, agent_id=session_id) async def commit_openviking() -> dict[str, Any]: return await self.openviking.commit_session(credential, session_id) async def flush_everos() -> dict[str, Any]: return await self.everos.flush(user_id, session_id) backends = await self._run_backends(openviking=commit_openviking, everos=flush_everos) return CommitResponse(status=self._aggregate_status(backends), backends=backends) async def extract_session(self, user_id: str, user_key: str, session_id: str) -> ExtractResponse: credential = self.openviking.credential_for_user(user_id, user_key, agent_id=session_id) backends = { "openviking": await self._capture(lambda: self.openviking.extract_session(credential, session_id)), } return ExtractResponse(status=self._aggregate_status(backends), backends=backends) async def get_openviking_task( self, user_id: str, user_key: str, task_id: str, session_id: str | None = None, ) -> dict[str, Any]: credential = self.openviking.credential_for_user(user_id, user_key, agent_id=session_id) return await self.openviking.get_task(credential, task_id) async def search(self, request: SearchRequest) -> SearchResponse: credential = self.openviking.credential_for_user( request.user_id, request.user_key, agent_id=request.session_id, ) everos_method = "agentic" if request.use_llm else "hybrid" async def search_openviking() -> dict[str, Any]: return await self.openviking.search( credential, request.query, request.limit, request.level, request.score_threshold, request.target_uri, ) async def search_everos() -> dict[str, Any]: return await self.everos.search( request.user_id, request.session_id, request.query, everos_method, request.limit, ) backends = await self._run_backends(openviking=search_openviking, everos=search_everos) backends = self._remove_vectors_from_backends(backends) items = self._merge_search_items(backends) compact_backends = self._compact_search_backends(backends) return SearchResponse( status=self._aggregate_status(backends), items=items[: request.limit], backends=compact_backends, ) async def get_session_context(self, session_id: str, request: SessionContextRequest) -> SessionContextResponse: credential = self.openviking.credential_for_user( request.user_id, request.user_key, agent_id=session_id, ) async def read_openviking_context() -> dict[str, Any]: return await self.openviking.get_session_context(credential, session_id) async def search_everos() -> dict[str, Any]: return await self.everos.search( request.user_id, session_id, request.query, "hybrid", request.limit, ) backends = await self._run_backends(openviking=read_openviking_context, everos=search_everos) backends = self._remove_vectors_from_backends(backends) context = self._context_from_openviking_result(backends["openviking"].result) items = ( self._items_from_backend_result("everos", backends["everos"].result)[: request.limit] if backends["everos"].status == "success" else [] ) return SessionContextResponse( status=self._aggregate_status(backends), context=context, items=items, backends=self._compact_session_context_backends(backends), ) async def get_profile( self, user_id: str, user_key: str, query: str = "用户画像", limit: int = 10, level: int = 2, ) -> ProfileResponse: credential = self.openviking.credential_for_user(user_id, user_key) backends = await self._run_backends( everos=lambda: self.everos.get_profile(user_id), openviking=lambda: self.openviking.search_profile_memories(credential, query, limit, level), ) backends = self._remove_vectors_from_backends(backends) profile = backends["everos"].result if backends["everos"].status == "success" else None items = ( self._items_from_backend_result("openviking", backends["openviking"].result)[:limit] if backends["openviking"].status == "success" else [] ) return ProfileResponse( status=self._aggregate_status(backends), profile=profile, items=items, backends=self._compact_profile_backends(backends), ) async def health(self) -> dict[str, Any]: backends = await self._run_backends(openviking=self.openviking.health, everos=self.everos.health) return {"status": self._aggregate_status(backends), "backends": backends} def _messages_from_request(self, request: MessageIngestRequest) -> list[dict[str, str]]: messages = [] if request.user_message: messages.append({"role": "user", "content": request.user_message}) if request.assistant_message: messages.append({"role": "assistant", "content": request.assistant_message}) return messages def _is_remote_url(self, path: str) -> bool: return urlparse(path).scheme in {"http", "https"} def _temp_file_id_from_result(self, result: Any) -> str: data = result.get("result") if isinstance(result, dict) and isinstance(result.get("result"), dict) else result temp_file_id = data.get("temp_file_id") if isinstance(data, dict) else None if not temp_file_id: raise ValueError("OpenViking temp upload response missing temp_file_id") return str(temp_file_id) async def _run_backends(self, **calls: Callable[[], Awaitable[Any]]) -> dict[str, BackendStatus]: names = list(calls) results = await asyncio.gather(*(self._capture(calls[name]) for name in names)) return dict(zip(names, results)) async def _capture(self, call: Callable[[], Awaitable[Any]]) -> BackendStatus: try: return BackendStatus(status="success", result=await call()) except Exception as exc: # noqa: BLE001 message = str(exc) error = f"{type(exc).__name__}: {message}" if message else type(exc).__name__ return BackendStatus(status="failed", error=error) def _aggregate_status(self, backends: dict[str, BackendStatus]) -> str: statuses = {backend.status for backend in backends.values()} if statuses == {"success"}: return "success" if "success" in statuses: return "partial_success" return "failed" def _merge_search_items(self, backends: dict[str, BackendStatus]) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for backend_name, backend in backends.items(): if backend.status != "success": continue items.extend(self._items_from_backend_result(backend_name, backend.result)) return items def _items_from_backend_result(self, backend_name: str, result: Any) -> list[dict[str, Any]]: if isinstance(result, dict) and isinstance(result.get("items"), list): return [self._with_backend(backend_name, item) for item in result["items"] if isinstance(item, dict)] data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result if not isinstance(data, dict): return [] if isinstance(data.get("result"), dict): data = data["result"] raw_items: list[dict[str, Any]] = [] for key in ("memories", "resources", "episodes", "profiles", "raw_messages"): values = data.get(key) if isinstance(values, list): raw_items.extend( self._compact_search_item(backend_name, key, item) for item in values if isinstance(item, dict) ) return [self._with_backend(backend_name, item) for item in raw_items] def _with_backend(self, backend_name: str, item: dict[str, Any]) -> dict[str, Any]: if "source_backend" in item: return item return {"source_backend": backend_name, **item} def _compact_search_item(self, backend_name: str, collection: str, item: dict[str, Any]) -> dict[str, Any]: if backend_name == "everos": fields = ( "id", "user_id", "session_id", "timestamp", "summary", "score", ) compact = {"memory_type": self._singular_memory_type(collection)} compact.update({field: item[field] for field in fields if field in item and item[field] is not None}) return compact return item def _singular_memory_type(self, collection: str) -> str: names = { "memories": "memory", "resources": "resource", "episodes": "episode", "profiles": "profile", "raw_messages": "raw_message", } return names.get(collection, collection) def _compact_search_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]: return { name: backend.model_copy(update={"result": self._compact_backend_result(name, backend.result)}) for name, backend in backends.items() } def _compact_backend_result(self, backend_name: str, result: Any) -> Any: if backend_name == "everos": data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result if not isinstance(data, dict): return result compact: dict[str, Any] = { "counts": { key: len(data.get(key) or []) for key in ("episodes", "profiles", "raw_messages") if isinstance(data.get(key), list) } } if "query" in data: compact["query"] = data["query"] return compact if backend_name == "openviking": data = result.get("result") if isinstance(result, dict) and isinstance(result.get("result"), dict) else result if not isinstance(data, dict): return result compact = { "status": result.get("status") if isinstance(result, dict) else None, "total": data.get("total"), "counts": { key: len(data.get(key) or []) for key in ("memories", "resources", "skills") if isinstance(data.get(key), list) }, } if "query_plan" in data: compact["query_plan"] = data["query_plan"] return {key: value for key, value in compact.items() if value is not None} return result def _context_from_openviking_result(self, result: Any) -> dict[str, Any] | None: if not isinstance(result, dict): return None data = result.get("result") if isinstance(result.get("result"), dict) else result return data if isinstance(data, dict) else None def _compact_session_context_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]: return { name: backend.model_copy(update={"result": self._compact_session_context_backend_result(name, backend.result)}) for name, backend in backends.items() } def _compact_session_context_backend_result(self, backend_name: str, result: Any) -> Any: if backend_name == "openviking": data = self._context_from_openviking_result(result) if data is None: return result compact = { "status": result.get("status") if isinstance(result, dict) else None, "estimatedTokens": data.get("estimatedTokens"), "stats": data.get("stats"), "has_latest_archive_overview": bool(data.get("latest_archive_overview")), "message_count": len(data.get("messages") or []) if isinstance(data.get("messages"), list) else 0, } return {key: value for key, value in compact.items() if value is not None} return self._compact_backend_result(backend_name, result) def _compact_profile_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]: return { name: backend.model_copy(update={"result": self._compact_profile_backend_result(name, backend.result)}) for name, backend in backends.items() } def _compact_profile_backend_result(self, backend_name: str, result: Any) -> Any: if backend_name == "openviking": return self._compact_backend_result("openviking", result) if backend_name == "everos": data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result if not isinstance(data, dict): return result compact: dict[str, Any] = {} for key in ("total_count", "count"): if key in data: compact[key] = data[key] compact["counts"] = { key: len(data.get(key) or []) for key in ("episodes", "profiles", "agent_cases", "agent_skills") if isinstance(data.get(key), list) } return compact return result def _remove_vectors_from_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]: return { name: backend.model_copy(update={"result": self._remove_vectors(backend.result)}) for name, backend in backends.items() } def _remove_vectors(self, value: Any) -> Any: if isinstance(value, dict): return {key: self._remove_vectors(item) for key, item in value.items() if key != "vector"} if isinstance(value, list): return [self._remove_vectors(item) for item in value] return value