Update memory system skill and plugin

This commit is contained in:
2026-05-19 14:30:55 +08:00
parent ff514ad1f5
commit 92632553ab
9 changed files with 1126 additions and 1 deletions

View File

@ -0,0 +1,51 @@
# Hermes Memory System Plugin
This Hermes memory provider talks to the Memory System API instead of calling OpenViking or EverOS directly.
It stores completed Hermes turns through:
- `POST /memory-system/messages`
- `POST /memory-system/sessions/{session_id}/commit` on session end
- `POST /memory-system/search` for recall
- `GET /memory-system/users/{user_id}/profile` for user profile reads
## Configure
Put these values in the Hermes profile env file, usually `~/.hermes/.env`:
```dotenv
MEMORY_SYSTEM_ENDPOINT=http://127.0.0.1:1934
MEMORY_SYSTEM_USER_ID=default
MEMORY_SYSTEM_API_KEY=
MEMORY_SYSTEM_SEARCH_USE_LLM=false
MEMORY_SYSTEM_COMMIT_EVERY_TURNS=5
MEMORY_SYSTEM_COMMIT_INTERVAL_SECONDS=300
MEMORY_SYSTEM_TIMEOUT_SECONDS=180
```
You can also keep a separate file and point to it with `MEMORY_SYSTEM_ENV_FILE`.
Real environment variables still override file values.
Then select this provider in Hermes memory config:
```yaml
memory:
provider: memory_system
```
## Tools
- `memory_system_search`: search OpenViking and EverOS via Memory System API.
- `memory_system_profile`: read the EverOS profile memory for the active user.
- `memory_system_remember`: explicitly write an important memory and commit the session.
The plugin commits after 5 new turns or 300 seconds by default, whichever comes first.
Set either value to `0` to disable that trigger. Session end still commits any new turns that were not already committed.
`MEMORY_SYSTEM_TIMEOUT_SECONDS` should be long enough for commit/search calls that wait on EverOS LLM extraction or rerank services. The default is 180 seconds.
If commit returns `partial_success`, the plugin logs the response and does not mark the pending turns as committed, so a later periodic commit or session-end commit can retry EverOS flush.
Search responses from current Memory System API versions do not include raw `vector` fields. The API strips those large embedding arrays before returning merged results or backend debug payloads.
The plugin is intentionally thin. User identity, session identity, backend writes, OpenViking commit, and EverOS flush stay owned by Memory System API.

View File

@ -0,0 +1,565 @@
"""Hermes memory provider for Memory System API.
Memory System API wraps OpenViking session memory and EverOS user profile
memory behind one small HTTP surface. This plugin keeps Hermes integration
thin: completed turns are written to the API, session end triggers commit,
and tools expose search/profile/explicit remember operations.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
_DEFAULT_ENDPOINT = "http://127.0.0.1:1934"
_DEFAULT_TIMEOUT = 180.0
_CONFIG_KEYS = {
"MEMORY_SYSTEM_ENDPOINT",
"MEMORY_SYSTEM_API_KEY",
"MEMORY_SYSTEM_USER_ID",
"MEMORY_SYSTEM_SEARCH_USE_LLM",
"MEMORY_SYSTEM_COMMIT_EVERY_TURNS",
"MEMORY_SYSTEM_COMMIT_INTERVAL_SECONDS",
"MEMORY_SYSTEM_TIMEOUT_SECONDS",
}
def _get_httpx():
try:
import httpx
return httpx
except ImportError:
return None
def _env_bool(name: str, default: bool = False) -> bool:
return _bool_value(_memory_system_config().get(name), default)
def _bool_value(value: Optional[str], default: bool = False) -> bool:
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def _env_int(name: str, default: int) -> int:
return _int_value(_memory_system_config().get(name), default)
def _int_value(value: Optional[str], default: int) -> int:
if value is None or value.strip() == "":
return default
try:
return int(value)
except ValueError:
return default
def _env_float(name: str, default: float) -> float:
return _float_value(_memory_system_config().get(name), default)
def _float_value(value: Optional[str], default: float) -> float:
if value is None or value.strip() == "":
return default
try:
return float(value)
except ValueError:
return default
def _parse_env_file(path: Path) -> Dict[str, str]:
values: Dict[str, str] = {}
try:
lines = path.read_text(encoding="utf-8").splitlines()
except OSError:
return values
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.startswith("export "):
stripped = stripped[len("export ") :].strip()
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
if key not in _CONFIG_KEYS:
continue
value = value.strip().strip('"').strip("'")
values[key] = value
return values
def _memory_system_config(hermes_home: str = "") -> Dict[str, str]:
candidates: List[Path] = []
explicit_file = os.environ.get("MEMORY_SYSTEM_ENV_FILE", "")
if explicit_file:
candidates.append(Path(explicit_file).expanduser())
if hermes_home:
candidates.append(Path(hermes_home).expanduser() / ".env")
candidates.append(Path(hermes_home).expanduser() / "memory_system.env")
env_hermes_home = os.environ.get("HERMES_HOME", "")
if env_hermes_home:
candidates.append(Path(env_hermes_home).expanduser() / ".env")
candidates.append(Path(env_hermes_home).expanduser() / "memory_system.env")
candidates.extend(
[
Path.home() / ".hermes" / ".env",
Path.home() / ".hermes" / "memory_system.env",
Path.cwd() / "memory_system.env",
Path.cwd() / ".env",
]
)
config: Dict[str, str] = {}
seen: set[Path] = set()
for path in candidates:
resolved = path.expanduser()
if resolved in seen:
continue
seen.add(resolved)
config.update(_parse_env_file(resolved))
for key in _CONFIG_KEYS:
if key in os.environ:
config[key] = os.environ[key]
return config
class _MemorySystemClient:
"""Small sync HTTP client for Memory System API."""
def __init__(self, endpoint: str, api_key: str = "", timeout: float = _DEFAULT_TIMEOUT):
self._endpoint = endpoint.rstrip("/")
self._api_key = api_key
self._timeout = timeout
self._httpx = _get_httpx()
if self._httpx is None:
raise ImportError("httpx is required for memory_system: pip install httpx")
def _headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
if self._api_key:
headers["X-API-Key"] = self._api_key
return headers
def _url(self, path: str) -> str:
return f"{self._endpoint}{path}"
def get(self, path: str) -> Dict[str, Any]:
response = self._httpx.get(self._url(path), headers=self._headers(), timeout=self._timeout)
response.raise_for_status()
return response.json()
def post(self, path: str, payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
response = self._httpx.post(
self._url(path),
json=payload or {},
headers=self._headers(),
timeout=self._timeout,
)
response.raise_for_status()
return response.json()
def health(self) -> bool:
try:
response = self._httpx.get(self._url("/memory-system/health"), timeout=3.0)
return response.status_code == 200
except Exception:
return False
SEARCH_SCHEMA = {
"name": "memory_system_search",
"description": (
"Search persistent memory through Memory System API. "
"By default this uses fast hybrid search; set use_llm=true for agentic search."
),
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query."},
"use_llm": {
"type": "boolean",
"description": "Use agentic LLM search instead of hybrid search.",
},
"limit": {"type": "integer", "description": "Maximum results, default 10."},
"session_id": {
"type": "string",
"description": "Optional session override. Defaults to the active Hermes session.",
},
},
"required": ["query"],
},
}
PROFILE_SCHEMA = {
"name": "memory_system_profile",
"description": "Read the current user's profile memory from Memory System API.",
"parameters": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "Optional user override. Defaults to the active Hermes user.",
}
},
},
}
REMEMBER_SCHEMA = {
"name": "memory_system_remember",
"description": (
"Store an important memory through Memory System API and commit the active session. "
"Use this when information should be remembered beyond the current conversation."
),
"parameters": {
"type": "object",
"properties": {
"content": {"type": "string", "description": "Memory text to store."},
"session_id": {
"type": "string",
"description": "Optional session override. Defaults to the active Hermes session.",
},
},
"required": ["content"],
},
}
class MemorySystemMemoryProvider(MemoryProvider):
"""Hermes MemoryProvider backed by Memory System API."""
@property
def name(self) -> str:
return "memory_system"
def __init__(self) -> None:
config = _memory_system_config()
self._endpoint = config.get("MEMORY_SYSTEM_ENDPOINT", _DEFAULT_ENDPOINT)
self._api_key = config.get("MEMORY_SYSTEM_API_KEY", "")
self._user_id = config.get("MEMORY_SYSTEM_USER_ID", "default")
self._session_id = ""
self._client: Optional[_MemorySystemClient] = None
self._sync_thread: Optional[threading.Thread] = None
self._prefetch_thread: Optional[threading.Thread] = None
self._prefetch_lock = threading.Lock()
self._prefetch_result = ""
self._turn_count = 0
self._last_commit_turn = 0
self._last_commit_time = time.monotonic()
self._commit_every_turns = _env_int("MEMORY_SYSTEM_COMMIT_EVERY_TURNS", 5)
self._commit_interval_seconds = _env_int("MEMORY_SYSTEM_COMMIT_INTERVAL_SECONDS", 300)
self._timeout = _env_float("MEMORY_SYSTEM_TIMEOUT_SECONDS", _DEFAULT_TIMEOUT)
self._default_use_llm = _env_bool("MEMORY_SYSTEM_SEARCH_USE_LLM", False)
def is_available(self) -> bool:
return bool(_memory_system_config().get("MEMORY_SYSTEM_ENDPOINT")) and _get_httpx() is not None
def get_config_schema(self) -> List[Dict[str, Any]]:
return [
{
"key": "endpoint",
"description": "Memory System API endpoint.",
"required": True,
"default": _DEFAULT_ENDPOINT,
"env_var": "MEMORY_SYSTEM_ENDPOINT",
},
{
"key": "api_key",
"description": "Memory System API key, if server.api_key is configured.",
"secret": True,
"required": False,
"env_var": "MEMORY_SYSTEM_API_KEY",
},
{
"key": "user_id",
"description": "Default Memory System user id for this Hermes profile.",
"required": False,
"default": "default",
"env_var": "MEMORY_SYSTEM_USER_ID",
},
{
"key": "commit_every_turns",
"description": "Commit after this many new turns. Set 0 to disable turn-based commits.",
"required": False,
"default": 5,
"env_var": "MEMORY_SYSTEM_COMMIT_EVERY_TURNS",
},
{
"key": "commit_interval_seconds",
"description": "Commit after this many seconds if new turns exist. Set 0 to disable time-based commits.",
"required": False,
"default": 300,
"env_var": "MEMORY_SYSTEM_COMMIT_INTERVAL_SECONDS",
},
{
"key": "timeout_seconds",
"description": "HTTP timeout for Memory System API requests. Commit may wait for EverOS LLM extraction.",
"required": False,
"default": _DEFAULT_TIMEOUT,
"env_var": "MEMORY_SYSTEM_TIMEOUT_SECONDS",
},
]
def initialize(self, session_id: str, **kwargs) -> None:
config = _memory_system_config(str(kwargs.get("hermes_home") or ""))
self._endpoint = config.get("MEMORY_SYSTEM_ENDPOINT", _DEFAULT_ENDPOINT)
self._api_key = config.get("MEMORY_SYSTEM_API_KEY", "")
self._user_id = (
config.get("MEMORY_SYSTEM_USER_ID")
or kwargs.get("user_id")
or kwargs.get("agent_identity")
or "default"
)
self._session_id = session_id
self._default_use_llm = _bool_value(config.get("MEMORY_SYSTEM_SEARCH_USE_LLM"), False)
self._commit_every_turns = _int_value(config.get("MEMORY_SYSTEM_COMMIT_EVERY_TURNS"), 5)
self._commit_interval_seconds = _int_value(
config.get("MEMORY_SYSTEM_COMMIT_INTERVAL_SECONDS"), 300
)
self._timeout = _float_value(config.get("MEMORY_SYSTEM_TIMEOUT_SECONDS"), _DEFAULT_TIMEOUT)
self._last_commit_turn = 0
self._last_commit_time = time.monotonic()
try:
client = _MemorySystemClient(self._endpoint, self._api_key, self._timeout)
if not client.health():
logger.warning("Memory System API health check failed: %s", self._endpoint)
return
self._client = client
except Exception as exc:
logger.warning("Memory System API initialization failed: %s", exc)
self._client = None
def system_prompt_block(self) -> str:
if not self._client:
return ""
return (
"# Memory System\n"
"Persistent memory is active. Use memory_system_search for recall, "
"memory_system_profile for user profile, and memory_system_remember "
"for important information that should be stored."
)
def prefetch(self, query: str, *, session_id: str = "") -> str:
if self._prefetch_thread and self._prefetch_thread.is_alive():
self._prefetch_thread.join(timeout=3.0)
with self._prefetch_lock:
result = self._prefetch_result
self._prefetch_result = ""
if not result:
return ""
return f"## Memory System Context\n{result}"
def queue_prefetch(self, query: str, *, session_id: str = "") -> None:
if not self._client or not query:
return
def _run() -> None:
try:
response = self._client.post(
"/memory-system/search",
{
"user_id": self._user_id,
"session_id": session_id or self._session_id,
"query": query,
"use_llm": self._default_use_llm,
"limit": 5,
},
)
formatted = self._format_items(response.get("items", []), limit=5)
if formatted:
with self._prefetch_lock:
self._prefetch_result = formatted
except Exception as exc:
logger.debug("Memory System prefetch failed: %s", exc)
self._prefetch_thread = threading.Thread(
target=_run, daemon=True, name="memory-system-prefetch"
)
self._prefetch_thread.start()
def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None:
if not self._client:
return
self._turn_count += 1
def _sync() -> None:
try:
payload = self._message_payload(
session_id=session_id or self._session_id,
user_message=user_content,
assistant_message=assistant_content,
)
self._client.post("/memory-system/messages", payload)
if self._should_commit_now():
self._commit_session(session_id or self._session_id)
except Exception as exc:
logger.debug("Memory System sync_turn failed: %s", exc)
if self._sync_thread and self._sync_thread.is_alive():
self._sync_thread.join(timeout=5.0)
self._sync_thread = threading.Thread(target=_sync, daemon=True, name="memory-system-sync")
self._sync_thread.start()
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
if not self._client:
return
if self._sync_thread and self._sync_thread.is_alive():
self._sync_thread.join(timeout=10.0)
if self._turn_count == 0 or self._last_commit_turn >= self._turn_count:
return
try:
self._commit_session(self._session_id)
except Exception as exc:
logger.warning("Memory System session commit failed: %s", exc)
def on_memory_write(self, action: str, target: str, content: str) -> None:
if action != "add" or not content:
return
self._remember(content, session_id=self._session_id, commit=False)
def get_tool_schemas(self) -> List[Dict[str, Any]]:
return [SEARCH_SCHEMA, PROFILE_SCHEMA, REMEMBER_SCHEMA]
def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str:
if not self._client:
return tool_error("Memory System API is not connected")
try:
if tool_name == "memory_system_search":
return self._tool_search(args)
if tool_name == "memory_system_profile":
return self._tool_profile(args)
if tool_name == "memory_system_remember":
return self._tool_remember(args)
return tool_error(f"Unknown tool: {tool_name}")
except Exception as exc:
return tool_error(str(exc))
def shutdown(self) -> None:
for thread in (self._sync_thread, self._prefetch_thread):
if thread and thread.is_alive():
thread.join(timeout=5.0)
def _message_payload(
self,
*,
session_id: str,
user_message: str = "",
assistant_message: str = "",
) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"user_id": self._user_id,
"session_id": session_id,
"metadata": {"source": "hermes", "provider": "memory_system"},
}
if user_message:
payload["user_message"] = user_message[:4000]
if assistant_message:
payload["assistant_message"] = assistant_message[:4000]
return payload
def _format_items(self, items: List[Dict[str, Any]], *, limit: int) -> str:
parts = []
for item in items[:limit]:
text = (
item.get("content")
or item.get("text")
or item.get("memory")
or item.get("summary")
or json.dumps(item, ensure_ascii=False)
)
source = item.get("source") or item.get("backend") or "memory"
score = item.get("score")
prefix = f"[{source}]"
if isinstance(score, (int, float)):
prefix = f"{prefix} {score:.2f}"
parts.append(f"- {prefix} {text}")
return "\n".join(parts)
def _should_commit_now(self) -> bool:
if self._last_commit_turn >= self._turn_count:
return False
new_turns = self._turn_count - self._last_commit_turn
if self._commit_every_turns > 0 and new_turns >= self._commit_every_turns:
return True
elapsed = time.monotonic() - self._last_commit_time
return self._commit_interval_seconds > 0 and elapsed >= self._commit_interval_seconds
def _commit_session(self, session_id: str) -> Dict[str, Any]:
response = self._client.post(
f"/memory-system/sessions/{session_id}/commit",
{"user_id": self._user_id},
)
if response.get("status") == "success":
self._last_commit_turn = self._turn_count
self._last_commit_time = time.monotonic()
else:
logger.warning("Memory System commit did not fully succeed: %s", response)
return response
def _tool_search(self, args: Dict[str, Any]) -> str:
query = str(args.get("query", "")).strip()
if not query:
return tool_error("query is required")
limit = int(args.get("limit") or 10)
limit = max(1, min(limit, 100))
payload = {
"user_id": self._user_id,
"session_id": args.get("session_id") or self._session_id,
"query": query,
"use_llm": bool(args.get("use_llm", self._default_use_llm)),
"limit": limit,
}
return json.dumps(self._client.post("/memory-system/search", payload), ensure_ascii=False)
def _tool_profile(self, args: Dict[str, Any]) -> str:
user_id = str(args.get("user_id") or self._user_id).strip()
if not user_id:
return tool_error("user_id is required")
return json.dumps(
self._client.get(f"/memory-system/users/{user_id}/profile"),
ensure_ascii=False,
)
def _tool_remember(self, args: Dict[str, Any]) -> str:
content = str(args.get("content", "")).strip()
if not content:
return tool_error("content is required")
session_id = str(args.get("session_id") or self._session_id).strip()
return json.dumps(self._remember(content, session_id=session_id, commit=True), ensure_ascii=False)
def _remember(self, content: str, *, session_id: str, commit: bool) -> Dict[str, Any]:
if not self._client:
return {"error": "Memory System API is not connected"}
response = self._client.post(
"/memory-system/messages",
self._message_payload(session_id=session_id, user_message=content),
)
if commit:
commit_response = self._client.post(
f"/memory-system/sessions/{session_id}/commit",
{"user_id": self._user_id},
)
return {"status": response.get("status"), "write": response, "commit": commit_response}
return response
def register(ctx) -> None:
ctx.register_memory_provider(MemorySystemMemoryProvider())

View File

@ -0,0 +1,7 @@
MEMORY_SYSTEM_ENDPOINT=http://127.0.0.1:1934
MEMORY_SYSTEM_USER_ID=default
MEMORY_SYSTEM_API_KEY=
MEMORY_SYSTEM_SEARCH_USE_LLM=false
MEMORY_SYSTEM_COMMIT_EVERY_TURNS=5
MEMORY_SYSTEM_COMMIT_INTERVAL_SECONDS=300
MEMORY_SYSTEM_TIMEOUT_SECONDS=180

View File

@ -0,0 +1,9 @@
name: memory_system
version: 0.1.1
description: "Memory System API provider for Hermes, combining OpenViking session memory and EverOS user profile memory."
pip_dependencies:
- httpx
requires_env:
- MEMORY_SYSTEM_ENDPOINT
hooks:
- on_session_end