481 lines
18 KiB
Python
481 lines
18 KiB
Python
"""Async clients for OpenViking and EverOS used by the lightweight API."""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .config import get_config
|
|
from .store import ADMIN_ACCOUNT_ID, ADMIN_USER_ID, OpenVikingUserKeyStore
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OpenVikingCredential:
|
|
api_key: str
|
|
account_id: str | None = None
|
|
user_id: str | None = None
|
|
agent_id: str | None = None
|
|
user_key_auth: bool = False
|
|
|
|
|
|
class OpenVikingMemorySystemClient:
|
|
def __init__(self, store: OpenVikingUserKeyStore | None = None) -> None:
|
|
config = get_config()
|
|
self.base_url = config.openviking.url.rstrip("/")
|
|
self.root_key = config.openviking.api_key or "your-secret-root-key"
|
|
self.timeout = config.openviking.timeout
|
|
self.verify_ssl = config.openviking.verify_ssl
|
|
self.store = store or OpenVikingUserKeyStore(config.storage.sqlite_path)
|
|
|
|
async def health(self) -> dict[str, Any]:
|
|
async with self._client(self.root_key) as client:
|
|
response = await client.get("/health")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def create_account(self, account_id: str = ADMIN_ACCOUNT_ID, admin_user_id: str = ADMIN_USER_ID) -> dict[str, Any]:
|
|
async with self._client(self.root_key) as client:
|
|
response = await client.post(
|
|
"/api/v1/admin/accounts",
|
|
json=self._create_account_payload(account_id, admin_user_id),
|
|
)
|
|
if response.status_code == 409:
|
|
return response.json()
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
user_key = self._extract_user_key(data)
|
|
if user_key:
|
|
self.store.save_account_key(account_id, admin_user_id, user_key)
|
|
self.store.save_user_key(admin_user_id, user_key, account_id=account_id)
|
|
return data
|
|
|
|
async def create_user(self, user_id: str, role: str = "user") -> dict[str, Any]:
|
|
existing = self.store.get_user_key(user_id)
|
|
account_id = self._account_id_for_user(user_id)
|
|
if existing:
|
|
return {
|
|
"status": "ok",
|
|
"result": {
|
|
"account_id": account_id,
|
|
"admin_user_id": user_id,
|
|
"user_id": user_id,
|
|
"user_key": existing,
|
|
},
|
|
}
|
|
|
|
return await self.create_account(account_id, user_id)
|
|
|
|
def credential_for_user(
|
|
self,
|
|
user_id: str,
|
|
user_key: str,
|
|
agent_id: str | None = None,
|
|
) -> OpenVikingCredential:
|
|
if not self.store.user_key_matches(user_id, user_key):
|
|
raise PermissionError("Invalid user key")
|
|
return self.user_credential(user_key, user_id, agent_id=agent_id)
|
|
|
|
def user_credential(
|
|
self,
|
|
user_key: str,
|
|
user_id: str,
|
|
agent_id: str | None = None,
|
|
) -> OpenVikingCredential:
|
|
return OpenVikingCredential(
|
|
api_key=user_key,
|
|
account_id=self._account_id_for_user(user_id),
|
|
user_id=user_id,
|
|
agent_id=agent_id,
|
|
user_key_auth=True,
|
|
)
|
|
|
|
async def ensure_session(self, credential: OpenVikingCredential | str, session_id: str) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post("/api/v1/sessions", json={"session_id": session_id})
|
|
if response.status_code in {409, 422}:
|
|
self._save_session(credential, session_id)
|
|
return {"session_id": session_id, "status": "exists"}
|
|
response.raise_for_status()
|
|
self._save_session(credential, session_id)
|
|
return response.json()
|
|
|
|
async def append_message(
|
|
self, credential: OpenVikingCredential | str, session_id: str, role: str, content: str
|
|
) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/messages",
|
|
json={"role": role, "content": content},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def commit_session(self, credential: OpenVikingCredential | str, session_id: str) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post(
|
|
f"/api/v1/sessions/{session_id}/commit",
|
|
json={"keep_recent_count": 0},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
self._save_commit_metadata(credential, session_id, data)
|
|
return data
|
|
|
|
async def extract_session(self, credential: OpenVikingCredential | str, session_id: str) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post(f"/api/v1/sessions/{session_id}/extract")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_task(self, credential: OpenVikingCredential | str, task_id: str) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.get(f"/api/v1/tasks/{task_id}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def upload_temp_file(self, credential: OpenVikingCredential | str, path: str | Path) -> dict[str, Any]:
|
|
file_path = Path(path)
|
|
async with self._credential_client(credential, json_content_type=False) as client:
|
|
with file_path.open("rb") as file_obj:
|
|
response = await client.post(
|
|
"/api/v1/resources/temp_upload",
|
|
files={"file": (file_path.name, file_obj)},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def add_resource(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
*,
|
|
to: str,
|
|
reason: str | None = None,
|
|
wait: bool = True,
|
|
directly_upload_media: bool = True,
|
|
path: str | None = None,
|
|
temp_file_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
payload: dict[str, Any] = {
|
|
"to": to,
|
|
"wait": wait,
|
|
"directly_upload_media": directly_upload_media,
|
|
}
|
|
if reason is not None:
|
|
payload["reason"] = reason
|
|
if temp_file_id is not None:
|
|
payload["temp_file_id"] = temp_file_id
|
|
else:
|
|
payload["path"] = path
|
|
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post("/api/v1/resources", json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def delete_resource(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
uri: str,
|
|
recursive: bool = True,
|
|
) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.delete(
|
|
"/api/v1/fs",
|
|
params={"uri": uri, "recursive": str(recursive).lower()},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def list_memories(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
uri: str = "viking://user/memories",
|
|
recursive: bool = True,
|
|
) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.get(
|
|
"/api/v1/fs/ls",
|
|
params={"uri": uri, "recursive": str(recursive).lower()},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def read_memory(self, credential: OpenVikingCredential | str, uri: str) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.get("/api/v1/content/read", params={"uri": uri})
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def write_memory(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
*,
|
|
uri: str,
|
|
content: str,
|
|
mode: str = "create",
|
|
wait: bool = True,
|
|
) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post(
|
|
"/api/v1/content/write",
|
|
json={
|
|
"uri": uri,
|
|
"content": content,
|
|
"mode": mode,
|
|
"wait": wait,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def delete_memory(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
uri: str,
|
|
recursive: bool = False,
|
|
) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.delete(
|
|
"/api/v1/fs",
|
|
params={"uri": uri, "recursive": str(recursive).lower()},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def find(self, credential: OpenVikingCredential | str, query: str, limit: int) -> dict[str, Any]:
|
|
user_id = credential.user_id if isinstance(credential, OpenVikingCredential) else None
|
|
target_uri = f"viking://user/{user_id}/memories/" if user_id else "viking://user/memories/"
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post(
|
|
"/api/v1/search/find",
|
|
json={
|
|
"query": query,
|
|
"target_uri": target_uri,
|
|
"limit": limit,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def search(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
query: str,
|
|
limit: int,
|
|
level: int = 2,
|
|
score_threshold: float = 0.8,
|
|
target_uri: str = "viking://user/memories",
|
|
) -> dict[str, Any]:
|
|
payload: dict[str, Any] = {
|
|
"query": query,
|
|
"target_uri": target_uri,
|
|
"limit": limit,
|
|
"level": level,
|
|
"score_threshold": score_threshold,
|
|
}
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post("/api/v1/search/search", json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def search_profile_memories(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
query: str,
|
|
limit: int,
|
|
level: int,
|
|
) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.post(
|
|
"/api/v1/search/search",
|
|
json={
|
|
"query": query,
|
|
"limit": limit,
|
|
"level": level,
|
|
"target_uri": "viking://user/memories",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_session_context(self, credential: OpenVikingCredential | str, session_id: str) -> dict[str, Any]:
|
|
async with self._credential_client(credential) as client:
|
|
response = await client.get(f"/api/v1/sessions/{session_id}/context")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def _credential_client(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
json_content_type: bool = True,
|
|
) -> httpx.AsyncClient:
|
|
if isinstance(credential, str):
|
|
return self._client(credential, json_content_type=json_content_type)
|
|
if credential.user_key_auth:
|
|
return self._client(credential.api_key, json_content_type=json_content_type)
|
|
headers = {}
|
|
if credential.account_id:
|
|
headers["X-OpenViking-Account"] = credential.account_id
|
|
if credential.user_id:
|
|
headers["X-OpenViking-User"] = credential.user_id
|
|
if credential.agent_id:
|
|
headers["X-OpenViking-Agent"] = credential.agent_id
|
|
return self._client(credential.api_key, headers, json_content_type=json_content_type)
|
|
|
|
def _client(
|
|
self,
|
|
api_key: str,
|
|
extra_headers: dict[str, str] | None = None,
|
|
json_content_type: bool = True,
|
|
) -> httpx.AsyncClient:
|
|
headers = {"X-API-Key": api_key}
|
|
if json_content_type:
|
|
headers["Content-Type"] = "application/json"
|
|
if extra_headers:
|
|
headers.update(extra_headers)
|
|
return httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
verify=self.verify_ssl,
|
|
)
|
|
|
|
def _extract_user_key(self, data: dict[str, Any]) -> str | None:
|
|
result = data.get("result") if isinstance(data.get("result"), dict) else data
|
|
value = result.get("user_key") if isinstance(result, dict) else None
|
|
return str(value) if value else None
|
|
|
|
def _create_account_payload(self, account_id: str, admin_user_id: str) -> dict[str, Any]:
|
|
return {
|
|
"account_id": account_id,
|
|
"admin_user_id": admin_user_id,
|
|
}
|
|
|
|
def _account_id_for_user(self, user_id: str) -> str:
|
|
return f"{user_id}_account"
|
|
|
|
def _save_session(self, credential: OpenVikingCredential | str, session_id: str) -> None:
|
|
if isinstance(credential, OpenVikingCredential) and credential.user_id:
|
|
self.store.save_session(credential.user_id, session_id)
|
|
|
|
def _save_commit_metadata(
|
|
self,
|
|
credential: OpenVikingCredential | str,
|
|
session_id: str,
|
|
data: dict[str, Any],
|
|
) -> None:
|
|
if not isinstance(credential, OpenVikingCredential) or not credential.user_id:
|
|
return
|
|
result = data.get("result") if isinstance(data.get("result"), dict) else data
|
|
if not isinstance(result, dict):
|
|
return
|
|
task_id = result.get("task_id")
|
|
if not task_id:
|
|
return
|
|
archive_uri = result.get("archive_uri")
|
|
self.store.save_task(
|
|
user_id=credential.user_id,
|
|
session_id=session_id,
|
|
task_id=str(task_id),
|
|
archive_uri=str(archive_uri) if archive_uri else None,
|
|
)
|
|
|
|
|
|
class EverOSMemorySystemClient:
|
|
def __init__(self) -> None:
|
|
config = get_config()
|
|
self.base_url = config.everos.url.rstrip("/")
|
|
self.api_key = config.everos.api_key
|
|
self.timeout = config.everos.timeout
|
|
self.verify_ssl = config.everos.verify_ssl
|
|
self.health_path = config.everos.health_path
|
|
|
|
async def health(self) -> dict[str, Any]:
|
|
async with self._client() as client:
|
|
response = await client.get(self.health_path)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def append_message(self, user_id: str, session_id: str, role: str, content: str) -> dict[str, Any]:
|
|
async with self._client() as client:
|
|
response = await client.post(
|
|
"/api/v1/memories",
|
|
json=self.build_message_payload(user_id=user_id, session_id=session_id, role=role, content=content),
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def build_message_payload(self, user_id: str, session_id: str, role: str, content: str) -> dict[str, Any]:
|
|
everos_role = "assistant" if role == "assistant" else "user"
|
|
sender_id = "assistant" if everos_role == "assistant" else user_id
|
|
timestamp = int(datetime.now(timezone.utc).timestamp() * 1000)
|
|
return {
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
"messages": [
|
|
{
|
|
"message_id": f"msg_{timestamp}",
|
|
"timestamp": timestamp,
|
|
"sender_id": sender_id,
|
|
"sender_name": sender_id,
|
|
"role": everos_role,
|
|
"content": content,
|
|
}
|
|
],
|
|
}
|
|
|
|
async def flush(self, user_id: str, session_id: str) -> dict[str, Any]:
|
|
async with self._client() as client:
|
|
response = await client.post("/api/v1/memories/flush", json={"user_id": user_id, "session_id": session_id})
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def search(self, user_id: str, session_id: str | None, query: str, method: str, limit: int) -> dict[str, Any]:
|
|
filters: dict[str, Any] = {"user_id": user_id}
|
|
if session_id:
|
|
filters["session_id"] = session_id
|
|
async with self._client() as client:
|
|
response = await client.post(
|
|
"/api/v1/memories/search",
|
|
json={
|
|
"query": query,
|
|
"method": method,
|
|
"memory_types": ["episodic_memory", "profile", "raw_message"],
|
|
"filters": filters,
|
|
"top_k": limit,
|
|
"include_original_data": True,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_profile(self, user_id: str) -> dict[str, Any]:
|
|
async with self._client() as client:
|
|
response = await client.post(
|
|
"/api/v1/memories/get",
|
|
json={
|
|
"memory_type": "profile",
|
|
"filters": {"user_id": user_id},
|
|
"page": 1,
|
|
"page_size": 20,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def _client(self) -> httpx.AsyncClient:
|
|
headers = {"Content-Type": "application/json"}
|
|
if self.api_key:
|
|
headers["X-API-Key"] = self.api_key
|
|
headers["Authorization"] = f"Bearer {self.api_key}"
|
|
return httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
verify=self.verify_ssl,
|
|
)
|