"""Client for the external EverMemOS consolidation service.""" from __future__ import annotations from json import JSONDecodeError from typing import Any import httpx from .backend_contracts import BackendCommitResult, BackendOperation, BackendResultStatus, BackendRetrieveResult, BackendWriteResult from .backend_normalization import ( map_backend_error_to_retryable, normalize_evermemos_commit_response, normalize_evermemos_ingest_response, normalize_evermemos_retrieve_response, ) from .config import get_config from .schemas import AccessContext, EpisodeRecord, MemoryRecord from .schemas_v2 import BackendType class EverMemOSError(RuntimeError): """Raised when the external EverMemOS service cannot consolidate.""" class EverMemOSClient: """Small HTTP client with a tolerant response normalizer. The deployed EverMemOS API may evolve independently from Memory Gateway. Gateway sends a stable payload and accepts several common response shapes: `result`, `data`, or the raw top-level object with `candidates/promoted`. """ def __init__( self, base_url: str | None = None, api_key: str | None = None, timeout: int | None = None, enabled: bool | None = None, mode: str | None = None, verify_ssl: bool | None = None, health_path: str | None = None, ingest_path: str | None = None, consolidate_path: str | None = None, transport: httpx.BaseTransport | None = None, ) -> None: config = get_config().evermemos self.base_url = (base_url if base_url is not None else config.url).rstrip("/") self.api_key = api_key if api_key is not None else config.api_key self.timeout = timeout or config.timeout self.enabled = config.enabled if enabled is None else enabled self.mode = mode or config.mode self.verify_ssl = config.verify_ssl if verify_ssl is None else verify_ssl self.health_path = health_path or config.health_path self.ingest_path = ingest_path or config.ingest_path self.consolidate_path = consolidate_path or config.consolidate_path self.transport = transport def _headers(self) -> dict[str, str]: headers = {"Content-Type": "application/json"} if self.api_key: headers["X-API-Key"] = self.api_key headers["Authorization"] = f"Bearer {self.api_key}" return headers def health(self) -> dict[str, Any]: url = self.base_url + self.health_path try: health_timeout = httpx.Timeout(min(self.timeout, 2.0), connect=min(self.timeout, 0.5)) with httpx.Client(timeout=health_timeout, headers=self._headers()) as client: response = client.get(url) response.raise_for_status() return {"status": "ok", "url": self.base_url, "response": response.json()} except Exception as exc: # noqa: BLE001 return {"status": "error", "url": self.base_url, "error": str(exc)} def ingest_message(self, payload: dict[str, Any]) -> BackendWriteResult: """v2 adapter placeholder for message-level EverMemOS ingestion. Mapping spec: `backend_adapter_mapping.AdapterMappingSpec` maps EverMemOS ingest_turn to this method and requires BackendWriteResult. Payloads must contain only control-plane fields; raw request bodies are not persisted by the Gateway control-plane store. TODO(v2): bind this to EverMemOS `/api/v1/memories` or its stable message ingestion API after the external contract settles. """ runtime_payload = self._build_ingest_payload(payload) if self._use_real_api: return self._ingest_message_real(runtime_payload) raw = { "status": "skipped", "memory_id": runtime_payload.get("turn_id"), "metadata": { "reason": "evermemos_v2_ingest_adapter_not_configured", "schema_version": "evermemos.fixture.ingest.v2", }, } return self._normalize_ingest_response(raw) @property def _use_real_api(self) -> bool: # Real ingest is strictly gated by mode=real. The legacy `enabled` # field is retained for config compatibility, but must not trigger # network traffic by itself. return self.mode == "real" def _ingest_message_real(self, runtime_payload: dict[str, Any]) -> BackendWriteResult: if not self.base_url: return self._failed_ingest_result( error_code="config_error", error_message="EverMemOS real ingest is enabled but base_url is missing", retryable=False, ) try: with httpx.Client( base_url=self.base_url, headers=self._headers(), timeout=self.timeout, verify=self.verify_ssl, transport=self.transport, ) as client: response = client.post(self.ingest_path, json=runtime_payload) if response.status_code >= 400: return self._failed_ingest_result( error_code=f"http_{response.status_code}", error_message=f"EverMemOS ingest failed with HTTP {response.status_code}", retryable=self._map_error(response), ) try: raw = response.json() except (JSONDecodeError, ValueError): return self._failed_ingest_result( error_code="invalid_json", error_message="EverMemOS ingest returned invalid JSON", retryable=True, ) if not isinstance(raw, dict): return self._failed_ingest_result( error_code="unexpected_response", error_message="EverMemOS ingest returned an unexpected response shape", retryable=True, ) return self._normalize_ingest_response(raw) except httpx.TimeoutException as exc: return self._failed_ingest_result("timeout", self._safe_error_message(exc), retryable=self._map_error(exc)) except httpx.RequestError as exc: return self._failed_ingest_result("network_error", self._safe_error_message(exc), retryable=self._map_error(exc)) except Exception as exc: # noqa: BLE001 return self._failed_ingest_result("unexpected_error", self._safe_error_message(exc), retryable=self._map_error(exc)) def extract_profile_long_term_v2(self, payload: dict[str, Any]) -> BackendCommitResult: """v2 adapter placeholder for profile / long-term extraction. Mapping spec: commit_session returns BackendCommitResult and should produce native episodic/profile/long-term refs once the real API is stable. """ runtime_payload = self._build_commit_payload(payload) raw = { "status": "success", "session_id": runtime_payload.get("session_id"), "metadata": { "reason": "evermemos_v2_commit_fixture", "schema_version": "evermemos.fixture.commit.v2", }, "data": { "produced_refs": [ { "ref_type": "profile", "profile_id": f"em_profile:{runtime_payload.get('user_id') or 'unknown'}", "metadata": {"schema_version": "evermemos.fixture.profile.v2"}, }, { "ref_type": "long_term_memory", "memory_id": f"em_long_term:{runtime_payload.get('session_id')}", "metadata": {"schema_version": "evermemos.fixture.long_term.v2"}, }, ] }, } return self._normalize_commit_response(raw) def retrieve_context_v2(self, payload: dict[str, Any]) -> BackendRetrieveResult: """v2 adapter placeholder for episodic/profile/long-term retrieval. Mapping spec: retrieve_context returns BackendRetrieveResult with normalized context items, not raw backend payload dumps. """ raw = { "status": "success", "metadata": { "reason": "evermemos_v2_retrieve_fixture", "schema_version": "evermemos.fixture.retrieve.v2", }, "data": { "items": [ { "text": "EverMemOS fixture profile context.", "profile_id": f"em_profile:{payload.get('user_id') or 'unknown'}", "score": 0.72, "memory_type": "profile", "metadata": {"schema_version": "evermemos.fixture.retrieve.item.v2"}, }, { "text": "EverMemOS fixture long-term memory context.", "memory_id": f"em_long_term:{payload.get('session_id') or 'unknown'}", "score": 0.69, "memory_type": "long_term_memory", "metadata": {"schema_version": "evermemos.fixture.retrieve.item.v2"}, }, ] }, } return self._normalize_retrieve_response(raw) def _build_ingest_payload(self, payload: dict[str, Any]) -> dict[str, Any]: # Runtime-only adapter payload. It may include conversation content for # the current request lifecycle; callers must not persist it to SQLite. return dict(payload) def _build_commit_payload(self, payload: dict[str, Any]) -> dict[str, Any]: return dict(payload) def _normalize_ingest_response(self, raw: dict[str, Any]) -> BackendWriteResult: return normalize_evermemos_ingest_response(raw) def _normalize_commit_response(self, raw: dict[str, Any]) -> BackendCommitResult: return normalize_evermemos_commit_response(raw) def _normalize_retrieve_response(self, raw: dict[str, Any]) -> BackendRetrieveResult: return normalize_evermemos_retrieve_response(raw) def _map_error(self, exc_or_response: Any) -> bool: status_code = getattr(exc_or_response, "status_code", None) error_code = getattr(exc_or_response, "error_code", None) error_message = str(exc_or_response) if exc_or_response is not None else None return map_backend_error_to_retryable( BackendType.EVERMEMOS, status_code=status_code, error_code=error_code, error_message=error_message, ) def _failed_ingest_result(self, error_code: str, error_message: str, retryable: bool) -> BackendWriteResult: return BackendWriteResult( backend_type=BackendType.EVERMEMOS, operation=BackendOperation.INGEST_TURN, status=BackendResultStatus.FAILED, retryable=retryable, error_code=error_code, error_message=error_message, metadata={"error_code": error_code}, ) def _safe_error_message(self, exc: Exception) -> str: return exc.__class__.__name__ def consolidate_session( self, session_id: str, ctx: AccessContext, episodes: list[EpisodeRecord], existing_memories: list[MemoryRecord], min_importance: float, target_namespace: str | None, ) -> dict[str, Any]: payload = { "schema_version": "memory-gateway.evermemos.consolidate.v1", "session_id": session_id, "context": ctx.model_dump(mode="json"), "min_importance": min_importance, "target_namespace": target_namespace, "episodes": [episode.model_dump(mode="json") for episode in episodes], "existing_memories": [memory.model_dump(mode="json") for memory in existing_memories], } paths = [ self.consolidate_path, "/v1/sessions/consolidate", "/v1/memory/consolidate", "/api/v1/sessions/consolidate", "/api/consolidate", "/consolidate", ] errors: list[str] = [] for path in dict.fromkeys(paths): try: with httpx.Client(timeout=self.timeout, headers=self._headers()) as client: response = client.post(self.base_url + path, json=payload) if response.status_code == 404: errors.append(f"{path}: 404") continue response.raise_for_status() return self._normalize_response(response.json(), path) except Exception as exc: # noqa: BLE001 errors.append(f"{path}: {exc}") if "Connection refused" in str(exc) or "timed out" in str(exc): break raise EverMemOSError("; ".join(errors) or "EverMemOS consolidation failed") def _normalize_response(self, payload: dict[str, Any], path: str) -> dict[str, Any]: data = payload.get("result") or payload.get("data") or payload return { "backend": "external", "service_url": self.base_url, "endpoint": path, "raw": payload, "session_id": data.get("session_id"), "episodes": data.get("episodes"), "candidates": data.get("candidates") or data.get("candidate_memories") or [], "promoted": data.get("promoted") or data.get("promoted_memories") or data.get("memories") or [], "duplicates": data.get("duplicates") or [], "conflicts": data.get("conflicts") or [], "review_drafts": data.get("review_drafts") or [], }