Files
memory-gateway/memory_system_api/schemas.py

78 lines
1.8 KiB
Python

"""Schemas for the lightweight Memory System API."""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
OperationStatus = Literal["success", "partial_success", "failed"]
class MessageIngestRequest(BaseModel):
user_id: str = Field(min_length=1)
user_key: str = Field(min_length=1)
session_id: str = Field(min_length=1)
user_message: str | None = None
assistant_message: str | None = None
timestamp: int | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
class SessionUserRequest(BaseModel):
user_id: str = Field(min_length=1)
user_key: str = Field(min_length=1)
class SearchRequest(BaseModel):
user_id: str = Field(min_length=1)
user_key: str = Field(min_length=1)
session_id: str | None = None
query: str = Field(min_length=1)
use_llm: bool = False
limit: int = Field(default=10, ge=1, le=100)
class BackendStatus(BaseModel):
status: OperationStatus
result: Any = None
error: str | None = None
class UserCreateRequest(BaseModel):
user_id: str = Field(min_length=1)
class AccountResponse(BaseModel):
status: OperationStatus
account: Any = None
backends: dict[str, BackendStatus]
class MessageIngestResponse(BaseModel):
status: OperationStatus
message_count: int
backends: dict[str, BackendStatus]
class CommitResponse(BaseModel):
status: OperationStatus
backends: dict[str, BackendStatus]
class ExtractResponse(BaseModel):
status: OperationStatus
backends: dict[str, BackendStatus]
class SearchResponse(BaseModel):
status: OperationStatus
items: list[dict[str, Any]] = Field(default_factory=list)
backends: dict[str, BackendStatus]
class ProfileResponse(BaseModel):
status: OperationStatus
profile: Any = None
backends: dict[str, BackendStatus]