Files
memory-gateway/memory_gateway/everos_client.py

497 lines
20 KiB
Python

"""Client for the external EverOS memory service."""
from __future__ import annotations
from datetime import datetime, timezone
from json import JSONDecodeError
from typing import Any
import httpx
from .backend_contracts import BackendCommitResult, BackendOperation, BackendResultStatus, BackendRetrieveResult, BackendWriteResult
from .backend_normalization import (
map_backend_error_to_retryable,
normalize_everos_commit_response,
normalize_everos_ingest_response,
normalize_everos_retrieve_response,
)
from .config import get_config
from .schemas import AccessContext, EpisodeRecord, MemoryRecord
from .schemas_v2 import BackendType
class EverOSError(RuntimeError):
"""Raised when the external EverOS service cannot process a request."""
class EverOSClient:
"""Small HTTP client with a tolerant response normalizer.
The deployed EverOS API may evolve independently from Memory Gateway.
Gateway sends a stable payload and accepts several common response shapes:
`result`, `data`, or the raw top-level object with `candidates/promoted`.
"""
def __init__(
self,
base_url: str | None = None,
api_key: str | None = None,
timeout: int | None = None,
enabled: bool | None = None,
mode: str | None = None,
verify_ssl: bool | None = None,
health_path: str | None = None,
ingest_path: str | None = None,
search_path: str | None = None,
flush_path: str | None = None,
retrieve_method: str | None = None,
transport: httpx.BaseTransport | None = None,
) -> None:
config = get_config().everos
self.base_url = (base_url if base_url is not None else config.url).rstrip("/")
self.api_key = api_key if api_key is not None else config.api_key
self.timeout = timeout or config.timeout
self.enabled = config.enabled if enabled is None else enabled
self.mode = mode or config.mode
self.verify_ssl = config.verify_ssl if verify_ssl is None else verify_ssl
self.health_path = health_path or config.health_path
self.ingest_path = ingest_path or config.ingest_path
self.search_path = search_path or config.search_path
self.flush_path = flush_path or config.flush_path
self.retrieve_method = retrieve_method or config.retrieve_method
self.transport = transport
def _headers(self) -> dict[str, str]:
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["X-API-Key"] = self.api_key
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
def health(self) -> dict[str, Any]:
url = self.base_url + self.health_path
try:
health_timeout = httpx.Timeout(min(self.timeout, 2.0), connect=min(self.timeout, 0.5))
with httpx.Client(timeout=health_timeout, headers=self._headers()) as client:
response = client.get(url)
response.raise_for_status()
return {"status": "ok", "url": self.base_url, "response": response.json()}
except Exception as exc: # noqa: BLE001
return {"status": "error", "url": self.base_url, "error": str(exc)}
def ingest_message(self, payload: dict[str, Any]) -> BackendWriteResult:
"""Write one Gateway turn to EverOS."""
runtime_payload = self._build_ingest_payload(payload)
if self._use_real_api:
return self._ingest_message_real(runtime_payload)
raw = {
"status": "skipped",
"memory_id": (runtime_payload.get("messages") or [{}])[0].get("message_id"),
"metadata": {
"reason": "everos_v2_ingest_adapter_not_configured",
"schema_version": "everos.fixture.ingest.v2",
},
}
return self._normalize_ingest_response(raw)
@property
def _use_real_api(self) -> bool:
# Real ingest is strictly gated by mode=real. The legacy `enabled`
# field is retained for config compatibility, but must not trigger
# network traffic by itself.
return self.mode == "real"
def _ingest_message_real(self, runtime_payload: dict[str, Any]) -> BackendWriteResult:
if not self.base_url:
return self._failed_ingest_result(
error_code="config_error",
error_message="EverOS real ingest is enabled but base_url is missing",
retryable=False,
)
try:
with httpx.Client(
base_url=self.base_url,
headers=self._headers(),
timeout=self.timeout,
verify=self.verify_ssl,
transport=self.transport,
) as client:
response = client.post(self.ingest_path, json=runtime_payload)
if response.status_code >= 400:
return self._failed_ingest_result(
error_code=f"http_{response.status_code}",
error_message=f"EverOS ingest failed with HTTP {response.status_code}",
retryable=self._map_error(response),
)
try:
raw = response.json()
except (JSONDecodeError, ValueError):
return self._failed_ingest_result(
error_code="invalid_json",
error_message="EverOS ingest returned invalid JSON",
retryable=True,
)
if not isinstance(raw, dict):
return self._failed_ingest_result(
error_code="unexpected_response",
error_message="EverOS ingest returned an unexpected response shape",
retryable=True,
)
return self._normalize_ingest_response(raw)
except httpx.TimeoutException as exc:
return self._failed_ingest_result("timeout", self._safe_error_message(exc), retryable=self._map_error(exc))
except httpx.RequestError as exc:
return self._failed_ingest_result("network_error", self._safe_error_message(exc), retryable=self._map_error(exc))
except Exception as exc: # noqa: BLE001
return self._failed_ingest_result("unexpected_error", self._safe_error_message(exc), retryable=self._map_error(exc))
def extract_profile_long_term_v2(self, payload: dict[str, Any]) -> BackendCommitResult:
"""v2 adapter placeholder for profile / long-term extraction.
Mapping spec: commit_session returns BackendCommitResult and should
produce native episodic/profile/long-term refs once the real API is stable.
"""
runtime_payload = self._build_commit_payload(payload)
raw = {
"status": "success",
"session_id": runtime_payload.get("session_id"),
"metadata": {
"reason": "everos_v2_commit_fixture",
"schema_version": "everos.fixture.commit.v2",
},
"data": {
"produced_refs": [
{
"ref_type": "profile",
"profile_id": f"everos_profile:{runtime_payload.get('user_id') or 'unknown'}",
"metadata": {"schema_version": "everos.fixture.profile.v2"},
},
{
"ref_type": "long_term_memory",
"memory_id": f"everos_long_term:{runtime_payload.get('session_id')}",
"metadata": {"schema_version": "everos.fixture.long_term.v2"},
},
]
},
}
return self._normalize_commit_response(raw)
def retrieve_context_v2(self, payload: dict[str, Any]) -> BackendRetrieveResult:
"""
Calls EverOS native API to retrieve memories.
"""
if not self._use_real_api:
return BackendRetrieveResult(
backend_type=BackendType.EVEROS,
operation=BackendOperation.RETRIEVE_CONTEXT,
status=BackendResultStatus.SKIPPED,
items=[],
metadata={"reason": "everos_retrieve_requires_real_mode"},
)
query = payload.get("query", "")
user_id = payload.get("user_id", "")
try:
with httpx.Client(
base_url=self.base_url,
headers=self._headers(),
timeout=self.timeout,
verify=self.verify_ssl,
transport=self.transport,
) as client:
resp = client.post(
self.search_path,
json={
"query": query,
"method": self.retrieve_method,
"memory_types": ["episodic_memory", "profile", "raw_message"],
"top_k": payload.get("limit", 10),
"filters": self._search_filters(user_id=user_id, session_id=payload.get("session_id")),
},
)
if resp.status_code >= 400:
return BackendRetrieveResult(
backend_type=BackendType.EVEROS,
operation=BackendOperation.RETRIEVE_CONTEXT,
status=BackendResultStatus.FAILED,
items=[],
error_code=f"http_{resp.status_code}",
error_message=f"EverOS retrieve failed: {resp.text}",
retryable=False
)
items = self._items_from_search_response(resp.json())
raw = {
"status": "success",
"data": {
"items": items
}
}
return self._normalize_retrieve_response(raw)
except Exception as exc:
return BackendRetrieveResult(
backend_type=BackendType.EVEROS,
operation=BackendOperation.RETRIEVE_CONTEXT,
status=BackendResultStatus.FAILED,
items=[],
error_code="request_error",
error_message=str(exc),
retryable=True
)
def _build_ingest_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
"""
Builds the payload according to EverOS native message schema.
"""
return {
"user_id": payload.get("user_id") or "gateway_user",
"session_id": payload.get("session_id"),
"messages": [
{
"message_id": payload.get("turn_id") or f"msg_{int(datetime.now(timezone.utc).timestamp() * 1000)}",
"sender_id": payload.get("user_id") or "gateway_user",
"sender_name": payload.get("user_id") or "gateway_user",
"role": self._everos_role(payload.get("role", "user")),
"timestamp": self._timestamp_ms(payload),
"content": payload.get("content", ""),
}
],
}
def _build_commit_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
return dict(payload)
def _normalize_ingest_response(self, raw: dict[str, Any]) -> BackendWriteResult:
return normalize_everos_ingest_response(raw)
def _normalize_commit_response(self, raw: dict[str, Any]) -> BackendCommitResult:
return normalize_everos_commit_response(raw)
def _normalize_retrieve_response(self, raw: dict[str, Any]) -> BackendRetrieveResult:
return normalize_everos_retrieve_response(raw)
def _map_error(self, exc_or_response: Any) -> bool:
status_code = getattr(exc_or_response, "status_code", None)
error_code = getattr(exc_or_response, "error_code", None)
error_message = str(exc_or_response) if exc_or_response is not None else None
return map_backend_error_to_retryable(
BackendType.EVEROS,
status_code=status_code,
error_code=error_code,
error_message=error_message,
)
def _failed_ingest_result(self, error_code: str, error_message: str, retryable: bool) -> BackendWriteResult:
return BackendWriteResult(
backend_type=BackendType.EVEROS,
operation=BackendOperation.INGEST_TURN,
status=BackendResultStatus.FAILED,
retryable=retryable,
error_code=error_code,
error_message=error_message,
metadata={"error_code": error_code},
)
def _safe_error_message(self, exc: Exception) -> str:
return exc.__class__.__name__
def consolidate_session(
self,
session_id: str,
ctx: AccessContext,
episodes: list[EpisodeRecord],
existing_memories: list[MemoryRecord],
min_importance: float,
target_namespace: str | None,
) -> dict[str, Any]:
if not self.base_url:
raise EverOSError("EverOS real mode requires base_url")
user_id = ctx.user_id or "gateway_user"
agent_id = ctx.agent_id or "gateway_agent"
with httpx.Client(
base_url=self.base_url,
timeout=self.timeout,
headers=self._headers(),
verify=self.verify_ssl,
transport=self.transport,
) as client:
for episode in episodes:
self._memorize_episode(client, episode=episode, session_id=session_id, user_id=user_id, agent_id=agent_id)
self._flush_session(client, session_id=session_id, user_id=user_id)
promoted = self._fetch_session_memories(client, session_id=session_id, user_id=user_id, target_namespace=target_namespace)
return {
"backend": "external",
"service_url": self.base_url,
"endpoint": self.ingest_path,
"raw": {"result": {"memories": promoted}},
"session_id": session_id,
"episodes": len(episodes),
"candidates": promoted,
"promoted": promoted,
"duplicates": [],
"conflicts": [],
"review_drafts": [],
}
def _memorize_episode(
self,
client: httpx.Client,
*,
episode: dict[str, Any],
session_id: str,
user_id: str,
agent_id: str,
) -> None:
episode_data = episode.model_dump(mode="json") if hasattr(episode, "model_dump") else dict(episode)
episode_id = str(episode_data.get("id") or f"epi_{int(datetime.now(timezone.utc).timestamp())}")
sender = agent_id if episode_data.get("source") == "agent" else user_id
role = "assistant" if sender == agent_id else "user"
created_at = episode_data.get("created_at") or datetime.now(timezone.utc).isoformat()
payload = {
"user_id": user_id,
"session_id": session_id,
"messages": [
{
"message_id": episode_id,
"sender_id": sender,
"sender_name": sender,
"role": role,
"timestamp": self._datetime_to_ms(created_at),
"content": episode_data.get("content") or "",
}
],
}
response = client.post(self.ingest_path, json=payload)
response.raise_for_status()
def _flush_session(self, client: httpx.Client, *, session_id: str, user_id: str) -> None:
response = client.post(self.flush_path, json={"user_id": user_id, "session_id": session_id})
response.raise_for_status()
def _fetch_session_memories(
self,
client: httpx.Client,
*,
session_id: str,
user_id: str,
target_namespace: str | None,
) -> list[dict[str, Any]]:
response = client.post(
self.search_path,
json={
"query": "memory",
"method": self.retrieve_method,
"memory_types": ["episodic_memory"],
"top_k": 20,
"filters": self._search_filters(user_id=user_id, session_id=session_id),
},
)
response.raise_for_status()
memories = self._items_from_search_response(response.json())
normalized: list[dict[str, Any]] = []
for index, memory in enumerate(memories, start=1):
content = memory.get("text") or memory.get("content") or memory.get("summary") or ""
if not content:
continue
normalized.append(
{
"id": memory.get("memory_id") or memory.get("id") or f"everos_{session_id}_{index}",
"namespace": target_namespace or f"user/{user_id}/long_term",
"memory_type": memory.get("memory_type") or "episodic_memory",
"content": content,
"summary": memory.get("summary") or content[:180],
"tags": ["everos-real", "memory-gateway"],
"importance": 0.7,
"confidence": 0.7,
"source": "everos",
"source_ref": memory.get("memory_id") or memory.get("id"),
}
)
return normalized
def _items_from_search_response(self, payload: dict[str, Any]) -> list[dict[str, Any]]:
data = payload.get("data") if isinstance(payload.get("data"), dict) else payload
items: list[dict[str, Any]] = []
for memory_type, key in (
("episodic_memory", "episodes"),
("profile", "profiles"),
("raw_message", "raw_messages"),
):
for item in data.get(key, []) or []:
if isinstance(item, dict):
items.append({**item, "memory_type": item.get("memory_type") or memory_type, "text": self._memory_text(item)})
agent_memory = data.get("agent_memory") or {}
if isinstance(agent_memory, dict):
for item in agent_memory.get("cases", []) or []:
if isinstance(item, dict):
items.append({**item, "memory_type": "agent_case", "text": self._memory_text(item)})
for item in agent_memory.get("skills", []) or []:
if isinstance(item, dict):
items.append({**item, "memory_type": "agent_skill", "text": self._memory_text(item)})
return items
def _memory_text(self, item: dict[str, Any]) -> str:
content_items = item.get("content_items")
if isinstance(content_items, list):
content_text = "\n".join(
str(content.get("text") or content.get("content") or "")
for content in content_items
if isinstance(content, dict)
).strip()
else:
content_text = ""
profile_data = item.get("profile_data")
if isinstance(profile_data, dict):
profile_text = str(profile_data)
else:
profile_text = ""
return (
item.get("episode")
or item.get("summary")
or item.get("subject")
or item.get("atomic_fact")
or item.get("task_intent")
or item.get("approach")
or item.get("content")
or content_text
or item.get("description")
or profile_text
or ""
)
def _search_filters(self, *, user_id: str | None, session_id: str | None = None) -> dict[str, Any]:
filters: dict[str, Any] = {"user_id": user_id or "gateway_user"}
if session_id:
filters["session_id"] = session_id
return filters
def _timestamp_ms(self, payload: dict[str, Any]) -> int:
trace = payload.get("trace") if isinstance(payload.get("trace"), dict) else {}
timestamp = trace.get("timestamp") or payload.get("created_at")
if timestamp:
return self._datetime_to_ms(timestamp)
return int(datetime.now(timezone.utc).timestamp() * 1000)
def _datetime_to_ms(self, value: Any) -> int:
if isinstance(value, (int, float)):
return int(value if value > 1_000_000_000_000 else value * 1000)
if isinstance(value, str):
text = value.replace("Z", "+00:00")
try:
return int(datetime.fromisoformat(text).timestamp() * 1000)
except ValueError:
return int(datetime.now(timezone.utc).timestamp() * 1000)
if isinstance(value, datetime):
return int(value.timestamp() * 1000)
return int(datetime.now(timezone.utc).timestamp() * 1000)
def _everos_role(self, role: str) -> str:
if role in {"assistant", "agent"}:
return "assistant"
if role == "tool":
return "assistant"
return "user"