314 lines
14 KiB
Python
314 lines
14 KiB
Python
"""Client for the external EverMemOS consolidation service."""
|
|
from __future__ import annotations
|
|
|
|
from json import JSONDecodeError
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .backend_contracts import BackendCommitResult, BackendOperation, BackendResultStatus, BackendRetrieveResult, BackendWriteResult
|
|
from .backend_normalization import (
|
|
map_backend_error_to_retryable,
|
|
normalize_evermemos_commit_response,
|
|
normalize_evermemos_ingest_response,
|
|
normalize_evermemos_retrieve_response,
|
|
)
|
|
from .config import get_config
|
|
from .schemas import AccessContext, EpisodeRecord, MemoryRecord
|
|
from .schemas_v2 import BackendType
|
|
|
|
|
|
class EverMemOSError(RuntimeError):
|
|
"""Raised when the external EverMemOS service cannot consolidate."""
|
|
|
|
|
|
class EverMemOSClient:
|
|
"""Small HTTP client with a tolerant response normalizer.
|
|
|
|
The deployed EverMemOS API may evolve independently from Memory Gateway.
|
|
Gateway sends a stable payload and accepts several common response shapes:
|
|
`result`, `data`, or the raw top-level object with `candidates/promoted`.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str | None = None,
|
|
api_key: str | None = None,
|
|
timeout: int | None = None,
|
|
enabled: bool | None = None,
|
|
mode: str | None = None,
|
|
verify_ssl: bool | None = None,
|
|
health_path: str | None = None,
|
|
ingest_path: str | None = None,
|
|
consolidate_path: str | None = None,
|
|
transport: httpx.BaseTransport | None = None,
|
|
) -> None:
|
|
config = get_config().evermemos
|
|
self.base_url = (base_url if base_url is not None else config.url).rstrip("/")
|
|
self.api_key = api_key if api_key is not None else config.api_key
|
|
self.timeout = timeout or config.timeout
|
|
self.enabled = config.enabled if enabled is None else enabled
|
|
self.mode = mode or config.mode
|
|
self.verify_ssl = config.verify_ssl if verify_ssl is None else verify_ssl
|
|
self.health_path = health_path or config.health_path
|
|
self.ingest_path = ingest_path or config.ingest_path
|
|
self.consolidate_path = consolidate_path or config.consolidate_path
|
|
self.transport = transport
|
|
|
|
def _headers(self) -> dict[str, str]:
|
|
headers = {"Content-Type": "application/json"}
|
|
if self.api_key:
|
|
headers["X-API-Key"] = self.api_key
|
|
headers["Authorization"] = f"Bearer {self.api_key}"
|
|
return headers
|
|
|
|
def health(self) -> dict[str, Any]:
|
|
url = self.base_url + self.health_path
|
|
try:
|
|
health_timeout = httpx.Timeout(min(self.timeout, 2.0), connect=min(self.timeout, 0.5))
|
|
with httpx.Client(timeout=health_timeout, headers=self._headers()) as client:
|
|
response = client.get(url)
|
|
response.raise_for_status()
|
|
return {"status": "ok", "url": self.base_url, "response": response.json()}
|
|
except Exception as exc: # noqa: BLE001
|
|
return {"status": "error", "url": self.base_url, "error": str(exc)}
|
|
|
|
def ingest_message(self, payload: dict[str, Any]) -> BackendWriteResult:
|
|
"""v2 adapter placeholder for message-level EverMemOS ingestion.
|
|
|
|
Mapping spec: `backend_adapter_mapping.AdapterMappingSpec` maps
|
|
EverMemOS ingest_turn to this method and requires BackendWriteResult.
|
|
Payloads must contain only control-plane fields; raw request bodies are
|
|
not persisted by the Gateway control-plane store.
|
|
|
|
TODO(v2): bind this to EverMemOS `/api/v1/memories` or its stable
|
|
message ingestion API after the external contract settles.
|
|
"""
|
|
runtime_payload = self._build_ingest_payload(payload)
|
|
if self._use_real_api:
|
|
return self._ingest_message_real(runtime_payload)
|
|
raw = {
|
|
"status": "skipped",
|
|
"memory_id": runtime_payload.get("turn_id"),
|
|
"metadata": {
|
|
"reason": "evermemos_v2_ingest_adapter_not_configured",
|
|
"schema_version": "evermemos.fixture.ingest.v2",
|
|
},
|
|
}
|
|
return self._normalize_ingest_response(raw)
|
|
|
|
@property
|
|
def _use_real_api(self) -> bool:
|
|
# Real ingest is strictly gated by mode=real. The legacy `enabled`
|
|
# field is retained for config compatibility, but must not trigger
|
|
# network traffic by itself.
|
|
return self.mode == "real"
|
|
|
|
def _ingest_message_real(self, runtime_payload: dict[str, Any]) -> BackendWriteResult:
|
|
if not self.base_url:
|
|
return self._failed_ingest_result(
|
|
error_code="config_error",
|
|
error_message="EverMemOS real ingest is enabled but base_url is missing",
|
|
retryable=False,
|
|
)
|
|
try:
|
|
with httpx.Client(
|
|
base_url=self.base_url,
|
|
headers=self._headers(),
|
|
timeout=self.timeout,
|
|
verify=self.verify_ssl,
|
|
transport=self.transport,
|
|
) as client:
|
|
response = client.post(self.ingest_path, json=runtime_payload)
|
|
if response.status_code >= 400:
|
|
return self._failed_ingest_result(
|
|
error_code=f"http_{response.status_code}",
|
|
error_message=f"EverMemOS ingest failed with HTTP {response.status_code}",
|
|
retryable=self._map_error(response),
|
|
)
|
|
try:
|
|
raw = response.json()
|
|
except (JSONDecodeError, ValueError):
|
|
return self._failed_ingest_result(
|
|
error_code="invalid_json",
|
|
error_message="EverMemOS ingest returned invalid JSON",
|
|
retryable=True,
|
|
)
|
|
if not isinstance(raw, dict):
|
|
return self._failed_ingest_result(
|
|
error_code="unexpected_response",
|
|
error_message="EverMemOS ingest returned an unexpected response shape",
|
|
retryable=True,
|
|
)
|
|
return self._normalize_ingest_response(raw)
|
|
except httpx.TimeoutException as exc:
|
|
return self._failed_ingest_result("timeout", self._safe_error_message(exc), retryable=self._map_error(exc))
|
|
except httpx.RequestError as exc:
|
|
return self._failed_ingest_result("network_error", self._safe_error_message(exc), retryable=self._map_error(exc))
|
|
except Exception as exc: # noqa: BLE001
|
|
return self._failed_ingest_result("unexpected_error", self._safe_error_message(exc), retryable=self._map_error(exc))
|
|
|
|
def extract_profile_long_term_v2(self, payload: dict[str, Any]) -> BackendCommitResult:
|
|
"""v2 adapter placeholder for profile / long-term extraction.
|
|
|
|
Mapping spec: commit_session returns BackendCommitResult and should
|
|
produce native episodic/profile/long-term refs once the real API is stable.
|
|
"""
|
|
runtime_payload = self._build_commit_payload(payload)
|
|
raw = {
|
|
"status": "success",
|
|
"session_id": runtime_payload.get("session_id"),
|
|
"metadata": {
|
|
"reason": "evermemos_v2_commit_fixture",
|
|
"schema_version": "evermemos.fixture.commit.v2",
|
|
},
|
|
"data": {
|
|
"produced_refs": [
|
|
{
|
|
"ref_type": "profile",
|
|
"profile_id": f"em_profile:{runtime_payload.get('user_id') or 'unknown'}",
|
|
"metadata": {"schema_version": "evermemos.fixture.profile.v2"},
|
|
},
|
|
{
|
|
"ref_type": "long_term_memory",
|
|
"memory_id": f"em_long_term:{runtime_payload.get('session_id')}",
|
|
"metadata": {"schema_version": "evermemos.fixture.long_term.v2"},
|
|
},
|
|
]
|
|
},
|
|
}
|
|
return self._normalize_commit_response(raw)
|
|
|
|
def retrieve_context_v2(self, payload: dict[str, Any]) -> BackendRetrieveResult:
|
|
"""v2 adapter placeholder for episodic/profile/long-term retrieval.
|
|
|
|
Mapping spec: retrieve_context returns BackendRetrieveResult with
|
|
normalized context items, not raw backend payload dumps.
|
|
"""
|
|
raw = {
|
|
"status": "success",
|
|
"metadata": {
|
|
"reason": "evermemos_v2_retrieve_fixture",
|
|
"schema_version": "evermemos.fixture.retrieve.v2",
|
|
},
|
|
"data": {
|
|
"items": [
|
|
{
|
|
"text": "EverMemOS fixture profile context.",
|
|
"profile_id": f"em_profile:{payload.get('user_id') or 'unknown'}",
|
|
"score": 0.72,
|
|
"memory_type": "profile",
|
|
"metadata": {"schema_version": "evermemos.fixture.retrieve.item.v2"},
|
|
},
|
|
{
|
|
"text": "EverMemOS fixture long-term memory context.",
|
|
"memory_id": f"em_long_term:{payload.get('session_id') or 'unknown'}",
|
|
"score": 0.69,
|
|
"memory_type": "long_term_memory",
|
|
"metadata": {"schema_version": "evermemos.fixture.retrieve.item.v2"},
|
|
},
|
|
]
|
|
},
|
|
}
|
|
return self._normalize_retrieve_response(raw)
|
|
|
|
def _build_ingest_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
# Runtime-only adapter payload. It may include conversation content for
|
|
# the current request lifecycle; callers must not persist it to SQLite.
|
|
return dict(payload)
|
|
|
|
def _build_commit_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
return dict(payload)
|
|
|
|
def _normalize_ingest_response(self, raw: dict[str, Any]) -> BackendWriteResult:
|
|
return normalize_evermemos_ingest_response(raw)
|
|
|
|
def _normalize_commit_response(self, raw: dict[str, Any]) -> BackendCommitResult:
|
|
return normalize_evermemos_commit_response(raw)
|
|
|
|
def _normalize_retrieve_response(self, raw: dict[str, Any]) -> BackendRetrieveResult:
|
|
return normalize_evermemos_retrieve_response(raw)
|
|
|
|
def _map_error(self, exc_or_response: Any) -> bool:
|
|
status_code = getattr(exc_or_response, "status_code", None)
|
|
error_code = getattr(exc_or_response, "error_code", None)
|
|
error_message = str(exc_or_response) if exc_or_response is not None else None
|
|
return map_backend_error_to_retryable(
|
|
BackendType.EVERMEMOS,
|
|
status_code=status_code,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
)
|
|
|
|
def _failed_ingest_result(self, error_code: str, error_message: str, retryable: bool) -> BackendWriteResult:
|
|
return BackendWriteResult(
|
|
backend_type=BackendType.EVERMEMOS,
|
|
operation=BackendOperation.INGEST_TURN,
|
|
status=BackendResultStatus.FAILED,
|
|
retryable=retryable,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
metadata={"error_code": error_code},
|
|
)
|
|
|
|
def _safe_error_message(self, exc: Exception) -> str:
|
|
return exc.__class__.__name__
|
|
|
|
def consolidate_session(
|
|
self,
|
|
session_id: str,
|
|
ctx: AccessContext,
|
|
episodes: list[EpisodeRecord],
|
|
existing_memories: list[MemoryRecord],
|
|
min_importance: float,
|
|
target_namespace: str | None,
|
|
) -> dict[str, Any]:
|
|
payload = {
|
|
"schema_version": "memory-gateway.evermemos.consolidate.v1",
|
|
"session_id": session_id,
|
|
"context": ctx.model_dump(mode="json"),
|
|
"min_importance": min_importance,
|
|
"target_namespace": target_namespace,
|
|
"episodes": [episode.model_dump(mode="json") for episode in episodes],
|
|
"existing_memories": [memory.model_dump(mode="json") for memory in existing_memories],
|
|
}
|
|
paths = [
|
|
self.consolidate_path,
|
|
"/v1/sessions/consolidate",
|
|
"/v1/memory/consolidate",
|
|
"/api/v1/sessions/consolidate",
|
|
"/api/consolidate",
|
|
"/consolidate",
|
|
]
|
|
errors: list[str] = []
|
|
for path in dict.fromkeys(paths):
|
|
try:
|
|
with httpx.Client(timeout=self.timeout, headers=self._headers()) as client:
|
|
response = client.post(self.base_url + path, json=payload)
|
|
if response.status_code == 404:
|
|
errors.append(f"{path}: 404")
|
|
continue
|
|
response.raise_for_status()
|
|
return self._normalize_response(response.json(), path)
|
|
except Exception as exc: # noqa: BLE001
|
|
errors.append(f"{path}: {exc}")
|
|
if "Connection refused" in str(exc) or "timed out" in str(exc):
|
|
break
|
|
raise EverMemOSError("; ".join(errors) or "EverMemOS consolidation failed")
|
|
|
|
def _normalize_response(self, payload: dict[str, Any], path: str) -> dict[str, Any]:
|
|
data = payload.get("result") or payload.get("data") or payload
|
|
return {
|
|
"backend": "external",
|
|
"service_url": self.base_url,
|
|
"endpoint": path,
|
|
"raw": payload,
|
|
"session_id": data.get("session_id"),
|
|
"episodes": data.get("episodes"),
|
|
"candidates": data.get("candidates") or data.get("candidate_memories") or [],
|
|
"promoted": data.get("promoted") or data.get("promoted_memories") or data.get("memories") or [],
|
|
"duplicates": data.get("duplicates") or [],
|
|
"conflicts": data.get("conflicts") or [],
|
|
"review_drafts": data.get("review_drafts") or [],
|
|
}
|