"""Orchestration for the lightweight Memory System API.""" from __future__ import annotations import asyncio from typing import Any, Awaitable, Callable from .clients import EverOSMemorySystemClient, OpenVikingMemorySystemClient from .schemas import ( BackendStatus, CommitResponse, ExtractResponse, MessageIngestRequest, MessageIngestResponse, ProfileResponse, SearchRequest, SearchResponse, ) class MemorySystemService: def __init__(self, openviking: Any | None = None, everos: Any | None = None) -> None: self.openviking = openviking or OpenVikingMemorySystemClient() self.everos = everos or EverOSMemorySystemClient() async def ingest_messages(self, request: MessageIngestRequest) -> MessageIngestResponse: messages = self._messages_from_request(request) if not messages: raise ValueError("at least one message is required") credential = await self.openviking.ensure_user(request.user_id) await self.openviking.ensure_session(credential, request.session_id) async def write_openviking() -> list[dict[str, Any]]: results = [] for message in messages: results.append( await self.openviking.append_message(credential, request.session_id, message["role"], message["content"]) ) return results async def write_everos() -> list[dict[str, Any]]: results = [] for message in messages: results.append( await self.everos.append_message(request.user_id, request.session_id, message["role"], message["content"]) ) return results backends = await self._run_backends(openviking=write_openviking, everos=write_everos) return MessageIngestResponse( status=self._aggregate_status(backends), message_count=len(messages), backends=backends, ) async def commit_session(self, user_id: str, session_id: str) -> CommitResponse: user_key = await self.openviking.ensure_user(user_id) async def commit_openviking() -> dict[str, Any]: return await self.openviking.commit_session(user_key, session_id) async def flush_everos() -> dict[str, Any]: return await self.everos.flush(user_id, session_id) backends = await self._run_backends(openviking=commit_openviking, everos=flush_everos) return CommitResponse(status=self._aggregate_status(backends), backends=backends) async def extract_session(self, user_id: str, session_id: str) -> ExtractResponse: user_key = await self.openviking.ensure_user(user_id) backends = { "openviking": await self._capture(lambda: self.openviking.extract_session(user_key, session_id)), } return ExtractResponse(status=self._aggregate_status(backends), backends=backends) async def get_openviking_task(self, user_id: str, task_id: str) -> dict[str, Any]: user_key = await self.openviking.ensure_user(user_id) return await self.openviking.get_task(user_key, task_id) async def search(self, request: SearchRequest) -> SearchResponse: user_key = await self.openviking.ensure_user(request.user_id) everos_method = "agentic" if request.use_llm else "hybrid" async def search_openviking() -> dict[str, Any]: if request.use_llm: return await self.openviking.search(user_key, request.session_id, request.query, request.limit) return await self.openviking.find(user_key, request.user_id, request.query, request.limit) async def search_everos() -> dict[str, Any]: return await self.everos.search( request.user_id, request.session_id, request.query, everos_method, request.limit, ) backends = await self._run_backends(openviking=search_openviking, everos=search_everos) backends = self._remove_vectors_from_backends(backends) items = self._merge_search_items(backends) return SearchResponse(status=self._aggregate_status(backends), items=items[: request.limit], backends=backends) async def get_profile(self, user_id: str) -> ProfileResponse: backends = {"everos": await self._capture(lambda: self.everos.get_profile(user_id))} profile = backends["everos"].result if backends["everos"].status == "success" else None return ProfileResponse(status=self._aggregate_status(backends), profile=profile, backends=backends) async def health(self) -> dict[str, Any]: backends = await self._run_backends(openviking=self.openviking.health, everos=self.everos.health) return {"status": self._aggregate_status(backends), "backends": backends} def _messages_from_request(self, request: MessageIngestRequest) -> list[dict[str, str]]: messages = [] if request.user_message: messages.append({"role": "user", "content": request.user_message}) if request.assistant_message: messages.append({"role": "assistant", "content": request.assistant_message}) return messages async def _run_backends(self, **calls: Callable[[], Awaitable[Any]]) -> dict[str, BackendStatus]: names = list(calls) results = await asyncio.gather(*(self._capture(calls[name]) for name in names)) return dict(zip(names, results)) async def _capture(self, call: Callable[[], Awaitable[Any]]) -> BackendStatus: try: return BackendStatus(status="success", result=await call()) except Exception as exc: # noqa: BLE001 message = str(exc) error = f"{type(exc).__name__}: {message}" if message else type(exc).__name__ return BackendStatus(status="failed", error=error) def _aggregate_status(self, backends: dict[str, BackendStatus]) -> str: statuses = {backend.status for backend in backends.values()} if statuses == {"success"}: return "success" if "success" in statuses: return "partial_success" return "failed" def _merge_search_items(self, backends: dict[str, BackendStatus]) -> list[dict[str, Any]]: items: list[dict[str, Any]] = [] for backend_name, backend in backends.items(): if backend.status != "success": continue items.extend(self._items_from_backend_result(backend_name, backend.result)) return items def _items_from_backend_result(self, backend_name: str, result: Any) -> list[dict[str, Any]]: if isinstance(result, dict) and isinstance(result.get("items"), list): return [self._with_backend(backend_name, item) for item in result["items"] if isinstance(item, dict)] data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result if not isinstance(data, dict): return [] if isinstance(data.get("result"), dict): data = data["result"] raw_items: list[dict[str, Any]] = [] for key in ("memories", "resources", "episodes", "profiles", "raw_messages"): values = data.get(key) if isinstance(values, list): raw_items.extend(item for item in values if isinstance(item, dict)) return [self._with_backend(backend_name, item) for item in raw_items] def _with_backend(self, backend_name: str, item: dict[str, Any]) -> dict[str, Any]: if "source_backend" in item: return item return {"source_backend": backend_name, **item} def _remove_vectors_from_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]: return { name: backend.model_copy(update={"result": self._remove_vectors(backend.result)}) for name, backend in backends.items() } def _remove_vectors(self, value: Any) -> Any: if isinstance(value, dict): return {key: self._remove_vectors(item) for key, item in value.items() if key != "vector"} if isinstance(value, list): return [self._remove_vectors(item) for item in value] return value