555 lines
24 KiB
Python
555 lines
24 KiB
Python
"""Orchestration for the lightweight Memory System API."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any, Awaitable, Callable
|
|
from urllib.parse import urlparse
|
|
|
|
from .clients import EverOSMemorySystemClient, OpenVikingMemorySystemClient
|
|
from .schemas import (
|
|
AccountResponse,
|
|
BackendStatus,
|
|
CommitResponse,
|
|
ExtractResponse,
|
|
MemoryOperationResponse,
|
|
MemoryWriteRequest,
|
|
MessageIngestRequest,
|
|
MessageIngestResponse,
|
|
ProfileResponse,
|
|
ResourceMutationResponse,
|
|
ResourceUploadRequest,
|
|
SearchRequest,
|
|
SearchResponse,
|
|
SessionContextRequest,
|
|
SessionContextResponse,
|
|
)
|
|
|
|
|
|
class MemorySystemService:
|
|
def __init__(self, openviking: Any | None = None, everos: Any | None = None) -> None:
|
|
self.openviking = openviking or OpenVikingMemorySystemClient()
|
|
self.everos = everos or EverOSMemorySystemClient()
|
|
|
|
async def create_user(self, user_id: str) -> AccountResponse:
|
|
backends = {"openviking": await self._capture(lambda: self.openviking.create_user(user_id))}
|
|
account = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return AccountResponse(status=self._aggregate_status(backends), account=account, backends=backends)
|
|
|
|
async def upload_resource(self, request: ResourceUploadRequest) -> ResourceMutationResponse:
|
|
credential = self.openviking.credential_for_user(request.user_id, request.user_key)
|
|
|
|
async def upload_openviking() -> dict[str, Any]:
|
|
if self._is_remote_url(request.path):
|
|
return await self.openviking.add_resource(
|
|
credential,
|
|
path=request.path,
|
|
to=request.to,
|
|
reason=request.reason,
|
|
wait=request.wait,
|
|
directly_upload_media=request.directly_upload_media,
|
|
)
|
|
|
|
temp_upload = await self.openviking.upload_temp_file(credential, request.path)
|
|
temp_file_id = self._temp_file_id_from_result(temp_upload)
|
|
return await self.openviking.add_resource(
|
|
credential,
|
|
temp_file_id=temp_file_id,
|
|
to=request.to,
|
|
reason=request.reason,
|
|
wait=request.wait,
|
|
directly_upload_media=request.directly_upload_media,
|
|
)
|
|
|
|
backends = {"openviking": await self._capture(upload_openviking)}
|
|
resource = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return ResourceMutationResponse(status=self._aggregate_status(backends), resource=resource, backends=backends)
|
|
|
|
async def delete_resource(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
uri: str,
|
|
recursive: bool = True,
|
|
) -> ResourceMutationResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key)
|
|
backends = {
|
|
"openviking": await self._capture(lambda: self.openviking.delete_resource(credential, uri, recursive)),
|
|
}
|
|
resource = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return ResourceMutationResponse(status=self._aggregate_status(backends), resource=resource, backends=backends)
|
|
|
|
async def list_memories(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
uri: str = "viking://user/memories",
|
|
recursive: bool = True,
|
|
) -> MemoryOperationResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key)
|
|
backends = {
|
|
"openviking": await self._capture(lambda: self.openviking.list_memories(credential, uri, recursive)),
|
|
}
|
|
memory = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends)
|
|
|
|
async def read_memory(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
uri: str,
|
|
) -> MemoryOperationResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key)
|
|
backends = {
|
|
"openviking": await self._capture(lambda: self.openviking.read_memory(credential, uri)),
|
|
}
|
|
memory = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends)
|
|
|
|
async def write_memory(self, request: MemoryWriteRequest) -> MemoryOperationResponse:
|
|
credential = self.openviking.credential_for_user(request.user_id, request.user_key)
|
|
backends = {
|
|
"openviking": await self._capture(
|
|
lambda: self.openviking.write_memory(
|
|
credential,
|
|
uri=request.uri,
|
|
content=request.content,
|
|
mode=request.mode,
|
|
wait=request.wait,
|
|
)
|
|
),
|
|
}
|
|
memory = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends)
|
|
|
|
async def delete_memory(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
uri: str,
|
|
recursive: bool = False,
|
|
) -> MemoryOperationResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key)
|
|
backends = {
|
|
"openviking": await self._capture(lambda: self.openviking.delete_memory(credential, uri, recursive)),
|
|
}
|
|
memory = backends["openviking"].result if backends["openviking"].status == "success" else None
|
|
return MemoryOperationResponse(status=self._aggregate_status(backends), memory=memory, backends=backends)
|
|
|
|
async def ingest_messages(self, request: MessageIngestRequest) -> MessageIngestResponse:
|
|
messages = self._messages_from_request(request)
|
|
if not messages:
|
|
raise ValueError("at least one message is required")
|
|
|
|
credential = self.openviking.credential_for_user(
|
|
request.user_id,
|
|
request.user_key,
|
|
agent_id=request.session_id,
|
|
)
|
|
await self.openviking.ensure_session(credential, request.session_id)
|
|
|
|
async def write_openviking() -> list[dict[str, Any]]:
|
|
results = []
|
|
for message in messages:
|
|
results.append(
|
|
await self.openviking.append_message(credential, request.session_id, message["role"], message["content"])
|
|
)
|
|
return results
|
|
|
|
async def write_everos() -> list[dict[str, Any]]:
|
|
results = []
|
|
for message in messages:
|
|
results.append(
|
|
await self.everos.append_message(request.user_id, request.session_id, message["role"], message["content"])
|
|
)
|
|
return results
|
|
|
|
backends = await self._run_backends(openviking=write_openviking, everos=write_everos)
|
|
return MessageIngestResponse(
|
|
status=self._aggregate_status(backends),
|
|
message_count=len(messages),
|
|
backends=backends,
|
|
)
|
|
|
|
async def commit_session(self, user_id: str, user_key: str, session_id: str) -> CommitResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key, agent_id=session_id)
|
|
|
|
async def commit_openviking() -> dict[str, Any]:
|
|
return await self.openviking.commit_session(credential, session_id)
|
|
|
|
async def flush_everos() -> dict[str, Any]:
|
|
return await self.everos.flush(user_id, session_id)
|
|
|
|
backends = await self._run_backends(openviking=commit_openviking, everos=flush_everos)
|
|
return CommitResponse(status=self._aggregate_status(backends), backends=backends)
|
|
|
|
async def extract_session(self, user_id: str, user_key: str, session_id: str) -> ExtractResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key, agent_id=session_id)
|
|
backends = {
|
|
"openviking": await self._capture(lambda: self.openviking.extract_session(credential, session_id)),
|
|
}
|
|
return ExtractResponse(status=self._aggregate_status(backends), backends=backends)
|
|
|
|
async def get_openviking_task(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
task_id: str,
|
|
session_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
credential = self.openviking.credential_for_user(user_id, user_key, agent_id=session_id)
|
|
return await self.openviking.get_task(credential, task_id)
|
|
|
|
async def search(self, request: SearchRequest) -> SearchResponse:
|
|
credential = self.openviking.credential_for_user(
|
|
request.user_id,
|
|
request.user_key,
|
|
agent_id=request.session_id,
|
|
)
|
|
everos_method = "agentic" if request.use_llm else "hybrid"
|
|
|
|
async def search_openviking() -> dict[str, Any]:
|
|
return await self.openviking.search(
|
|
credential,
|
|
request.query,
|
|
request.limit,
|
|
request.level,
|
|
request.score_threshold,
|
|
request.target_uri,
|
|
)
|
|
|
|
async def search_everos() -> dict[str, Any]:
|
|
return await self.everos.search(
|
|
request.user_id,
|
|
request.session_id,
|
|
request.query,
|
|
everos_method,
|
|
request.limit,
|
|
)
|
|
|
|
backends = await self._run_backends(openviking=search_openviking, everos=search_everos)
|
|
backends = self._remove_vectors_from_backends(backends)
|
|
backends["openviking"] = await self._enrich_openviking_search_memory_content(
|
|
backends["openviking"],
|
|
credential,
|
|
)
|
|
items = self._merge_search_items(backends)
|
|
compact_backends = self._compact_search_backends(backends)
|
|
return SearchResponse(
|
|
status=self._aggregate_status(backends),
|
|
items=items[: request.limit],
|
|
backends=compact_backends,
|
|
)
|
|
|
|
async def get_session_context(self, session_id: str, request: SessionContextRequest) -> SessionContextResponse:
|
|
credential = self.openviking.credential_for_user(
|
|
request.user_id,
|
|
request.user_key,
|
|
agent_id=session_id,
|
|
)
|
|
|
|
async def read_openviking_context() -> dict[str, Any]:
|
|
return await self.openviking.get_session_context(credential, session_id)
|
|
|
|
async def search_everos() -> dict[str, Any]:
|
|
return await self.everos.search(
|
|
request.user_id,
|
|
session_id,
|
|
request.query,
|
|
"hybrid",
|
|
request.limit,
|
|
)
|
|
|
|
backends = await self._run_backends(openviking=read_openviking_context, everos=search_everos)
|
|
backends = self._remove_vectors_from_backends(backends)
|
|
context = self._context_from_openviking_result(backends["openviking"].result)
|
|
items = (
|
|
self._items_from_backend_result("everos", backends["everos"].result)[: request.limit]
|
|
if backends["everos"].status == "success"
|
|
else []
|
|
)
|
|
return SessionContextResponse(
|
|
status=self._aggregate_status(backends),
|
|
context=context,
|
|
items=items,
|
|
backends=self._compact_session_context_backends(backends),
|
|
)
|
|
|
|
async def get_profile(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
query: str = "用户画像",
|
|
limit: int = 10,
|
|
level: int = 2,
|
|
) -> ProfileResponse:
|
|
credential = self.openviking.credential_for_user(user_id, user_key)
|
|
backends = await self._run_backends(
|
|
everos=lambda: self.everos.get_profile(user_id),
|
|
openviking=lambda: self.openviking.search_profile_memories(credential, query, limit, level),
|
|
)
|
|
backends = self._remove_vectors_from_backends(backends)
|
|
profile = backends["everos"].result if backends["everos"].status == "success" else None
|
|
items = (
|
|
self._items_from_backend_result("openviking", backends["openviking"].result)[:limit]
|
|
if backends["openviking"].status == "success"
|
|
else []
|
|
)
|
|
return ProfileResponse(
|
|
status=self._aggregate_status(backends),
|
|
profile=profile,
|
|
items=items,
|
|
backends=self._compact_profile_backends(backends),
|
|
)
|
|
|
|
async def health(self) -> dict[str, Any]:
|
|
backends = await self._run_backends(openviking=self.openviking.health, everos=self.everos.health)
|
|
return {"status": self._aggregate_status(backends), "backends": backends}
|
|
|
|
def _messages_from_request(self, request: MessageIngestRequest) -> list[dict[str, str]]:
|
|
messages = []
|
|
if request.user_message:
|
|
messages.append({"role": "user", "content": request.user_message})
|
|
if request.assistant_message:
|
|
messages.append({"role": "assistant", "content": request.assistant_message})
|
|
return messages
|
|
|
|
def _is_remote_url(self, path: str) -> bool:
|
|
return urlparse(path).scheme in {"http", "https"}
|
|
|
|
def _temp_file_id_from_result(self, result: Any) -> str:
|
|
data = result.get("result") if isinstance(result, dict) and isinstance(result.get("result"), dict) else result
|
|
temp_file_id = data.get("temp_file_id") if isinstance(data, dict) else None
|
|
if not temp_file_id:
|
|
raise ValueError("OpenViking temp upload response missing temp_file_id")
|
|
return str(temp_file_id)
|
|
|
|
async def _run_backends(self, **calls: Callable[[], Awaitable[Any]]) -> dict[str, BackendStatus]:
|
|
names = list(calls)
|
|
results = await asyncio.gather(*(self._capture(calls[name]) for name in names))
|
|
return dict(zip(names, results))
|
|
|
|
async def _capture(self, call: Callable[[], Awaitable[Any]]) -> BackendStatus:
|
|
try:
|
|
return BackendStatus(status="success", result=await call())
|
|
except Exception as exc: # noqa: BLE001
|
|
message = str(exc)
|
|
error = f"{type(exc).__name__}: {message}" if message else type(exc).__name__
|
|
return BackendStatus(status="failed", error=error)
|
|
|
|
def _aggregate_status(self, backends: dict[str, BackendStatus]) -> str:
|
|
statuses = {backend.status for backend in backends.values()}
|
|
if statuses == {"success"}:
|
|
return "success"
|
|
if "success" in statuses:
|
|
return "partial_success"
|
|
return "failed"
|
|
|
|
async def _enrich_openviking_search_memory_content(
|
|
self,
|
|
backend: BackendStatus,
|
|
credential: Any,
|
|
) -> BackendStatus:
|
|
if backend.status != "success":
|
|
return backend
|
|
memory_items = self._openviking_search_memory_items(backend.result)
|
|
if not memory_items:
|
|
return backend
|
|
await asyncio.gather(
|
|
*(self._add_latest_memory_content(credential, item) for item in memory_items),
|
|
)
|
|
return backend
|
|
|
|
async def _add_latest_memory_content(self, credential: Any, item: dict[str, Any]) -> None:
|
|
uri = item.get("uri")
|
|
if not isinstance(uri, str) or not uri.startswith("viking://"):
|
|
return
|
|
try:
|
|
result = await self.openviking.read_memory(credential, uri)
|
|
except Exception: # noqa: BLE001
|
|
return
|
|
data = result.get("result") if isinstance(result, dict) and isinstance(result.get("result"), dict) else result
|
|
content = data.get("content") if isinstance(data, dict) else None
|
|
if isinstance(content, str):
|
|
item["content"] = content
|
|
|
|
def _openviking_search_memory_items(self, result: Any) -> list[dict[str, Any]]:
|
|
if not isinstance(result, dict):
|
|
return []
|
|
if isinstance(result.get("items"), list):
|
|
return [
|
|
item for item in result["items"]
|
|
if isinstance(item, dict) and self._is_openviking_memory_item(item)
|
|
]
|
|
data = result.get("data") if isinstance(result.get("data"), dict) else result
|
|
if isinstance(data.get("result"), dict):
|
|
data = data["result"]
|
|
if not isinstance(data, dict) or not isinstance(data.get("memories"), list):
|
|
return []
|
|
return [item for item in data["memories"] if isinstance(item, dict)]
|
|
|
|
def _is_openviking_memory_item(self, item: dict[str, Any]) -> bool:
|
|
return item.get("context_type") == "memory" or item.get("memory_type") == "memory"
|
|
|
|
def _merge_search_items(self, backends: dict[str, BackendStatus]) -> list[dict[str, Any]]:
|
|
items: list[dict[str, Any]] = []
|
|
for backend_name, backend in backends.items():
|
|
if backend.status != "success":
|
|
continue
|
|
items.extend(self._items_from_backend_result(backend_name, backend.result))
|
|
return items
|
|
|
|
def _items_from_backend_result(self, backend_name: str, result: Any) -> list[dict[str, Any]]:
|
|
if isinstance(result, dict) and isinstance(result.get("items"), list):
|
|
return [self._with_backend(backend_name, item) for item in result["items"] if isinstance(item, dict)]
|
|
data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result
|
|
if not isinstance(data, dict):
|
|
return []
|
|
if isinstance(data.get("result"), dict):
|
|
data = data["result"]
|
|
raw_items: list[dict[str, Any]] = []
|
|
for key in ("memories", "resources", "episodes", "profiles", "raw_messages"):
|
|
values = data.get(key)
|
|
if isinstance(values, list):
|
|
raw_items.extend(
|
|
self._compact_search_item(backend_name, key, item)
|
|
for item in values
|
|
if isinstance(item, dict)
|
|
)
|
|
return [self._with_backend(backend_name, item) for item in raw_items]
|
|
|
|
def _with_backend(self, backend_name: str, item: dict[str, Any]) -> dict[str, Any]:
|
|
if "source_backend" in item:
|
|
return item
|
|
return {"source_backend": backend_name, **item}
|
|
|
|
def _compact_search_item(self, backend_name: str, collection: str, item: dict[str, Any]) -> dict[str, Any]:
|
|
if backend_name == "everos":
|
|
fields = (
|
|
"id",
|
|
"user_id",
|
|
"session_id",
|
|
"timestamp",
|
|
"summary",
|
|
"score",
|
|
)
|
|
compact = {"memory_type": self._singular_memory_type(collection)}
|
|
compact.update({field: item[field] for field in fields if field in item and item[field] is not None})
|
|
return compact
|
|
return item
|
|
|
|
def _singular_memory_type(self, collection: str) -> str:
|
|
names = {
|
|
"memories": "memory",
|
|
"resources": "resource",
|
|
"episodes": "episode",
|
|
"profiles": "profile",
|
|
"raw_messages": "raw_message",
|
|
}
|
|
return names.get(collection, collection)
|
|
|
|
def _compact_search_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]:
|
|
return {
|
|
name: backend.model_copy(update={"result": self._compact_backend_result(name, backend.result)})
|
|
for name, backend in backends.items()
|
|
}
|
|
|
|
def _compact_backend_result(self, backend_name: str, result: Any) -> Any:
|
|
if backend_name == "everos":
|
|
data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result
|
|
if not isinstance(data, dict):
|
|
return result
|
|
compact: dict[str, Any] = {
|
|
"counts": {
|
|
key: len(data.get(key) or [])
|
|
for key in ("episodes", "profiles", "raw_messages")
|
|
if isinstance(data.get(key), list)
|
|
}
|
|
}
|
|
if "query" in data:
|
|
compact["query"] = data["query"]
|
|
return compact
|
|
|
|
if backend_name == "openviking":
|
|
data = result.get("result") if isinstance(result, dict) and isinstance(result.get("result"), dict) else result
|
|
if not isinstance(data, dict):
|
|
return result
|
|
compact = {
|
|
"status": result.get("status") if isinstance(result, dict) else None,
|
|
"total": data.get("total"),
|
|
"counts": {
|
|
key: len(data.get(key) or [])
|
|
for key in ("memories", "resources", "skills")
|
|
if isinstance(data.get(key), list)
|
|
},
|
|
}
|
|
if "query_plan" in data:
|
|
compact["query_plan"] = data["query_plan"]
|
|
return {key: value for key, value in compact.items() if value is not None}
|
|
|
|
return result
|
|
|
|
def _context_from_openviking_result(self, result: Any) -> dict[str, Any] | None:
|
|
if not isinstance(result, dict):
|
|
return None
|
|
data = result.get("result") if isinstance(result.get("result"), dict) else result
|
|
return data if isinstance(data, dict) else None
|
|
|
|
def _compact_session_context_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]:
|
|
return {
|
|
name: backend.model_copy(update={"result": self._compact_session_context_backend_result(name, backend.result)})
|
|
for name, backend in backends.items()
|
|
}
|
|
|
|
def _compact_session_context_backend_result(self, backend_name: str, result: Any) -> Any:
|
|
if backend_name == "openviking":
|
|
data = self._context_from_openviking_result(result)
|
|
if data is None:
|
|
return result
|
|
compact = {
|
|
"status": result.get("status") if isinstance(result, dict) else None,
|
|
"estimatedTokens": data.get("estimatedTokens"),
|
|
"stats": data.get("stats"),
|
|
"has_latest_archive_overview": bool(data.get("latest_archive_overview")),
|
|
"message_count": len(data.get("messages") or []) if isinstance(data.get("messages"), list) else 0,
|
|
}
|
|
return {key: value for key, value in compact.items() if value is not None}
|
|
return self._compact_backend_result(backend_name, result)
|
|
|
|
def _compact_profile_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]:
|
|
return {
|
|
name: backend.model_copy(update={"result": self._compact_profile_backend_result(name, backend.result)})
|
|
for name, backend in backends.items()
|
|
}
|
|
|
|
def _compact_profile_backend_result(self, backend_name: str, result: Any) -> Any:
|
|
if backend_name == "openviking":
|
|
return self._compact_backend_result("openviking", result)
|
|
if backend_name == "everos":
|
|
data = result.get("data") if isinstance(result, dict) and isinstance(result.get("data"), dict) else result
|
|
if not isinstance(data, dict):
|
|
return result
|
|
compact: dict[str, Any] = {}
|
|
for key in ("total_count", "count"):
|
|
if key in data:
|
|
compact[key] = data[key]
|
|
compact["counts"] = {
|
|
key: len(data.get(key) or [])
|
|
for key in ("episodes", "profiles", "agent_cases", "agent_skills")
|
|
if isinstance(data.get(key), list)
|
|
}
|
|
return compact
|
|
return result
|
|
|
|
def _remove_vectors_from_backends(self, backends: dict[str, BackendStatus]) -> dict[str, BackendStatus]:
|
|
return {
|
|
name: backend.model_copy(update={"result": self._remove_vectors(backend.result)})
|
|
for name, backend in backends.items()
|
|
}
|
|
|
|
def _remove_vectors(self, value: Any) -> Any:
|
|
if isinstance(value, dict):
|
|
return {key: self._remove_vectors(item) for key, item in value.items() if key != "vector"}
|
|
if isinstance(value, list):
|
|
return [self._remove_vectors(item) for item in value]
|
|
return value
|