diff --git a/.gitignore b/.gitignore index f99717d..aae02ab 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ venv/ # Runtime output *.log *.tmp +*.sqlite3 +obsidian-vault/Reviews/ diff --git a/README.md b/README.md index 8cd0aaa..e294748 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,17 @@ Memory Gateway 是一个通用记忆网关,用于给 AI agent / harness 提供 - 将上传文档保存到 Obsidian vault。 - 将文档摘要和结构化 artifact 写入 OpenViking knowledge。 - 给 Hermes 提供通用 `memory-gateway` skill。 +- 新增通用 Memory Gateway v1 方案与 POC 骨架:多用户、namespace、visibility/ACL、episode、session commit、audit、skills 分层。 +- v1 metadata 默认持久化到 SQLite,覆盖 users、memories、episodes、profiles、audit。 +- `/v1/memory/search` 先做本地 ACL 过滤,再按可见 namespace 查询 OpenViking。 +- v1 MCP tools 已接入现有 `/mcp/rpc`。 +- `/v1/sessions/{session_id}/commit` 优先调用独立 EverMemOS HTTP 服务;服务不可用且允许 fallback 时,才使用 Gateway 进程内 POC worker。 + +完整方案见: + +```text +docs/generic-memory-gateway-design.md +``` ## 架构 @@ -75,6 +86,11 @@ llm: obsidian: vault_path: /home/tom/memory-gateway/obsidian-vault knowledge_dir: 01_Knowledge/Uploaded + review_dir: Reviews/Queue + +storage: + backend: sqlite + sqlite_path: /home/tom/memory-gateway/memory_gateway.sqlite3 ``` `config.yaml` 已被 `.gitignore` 忽略,不会提交密钥。 @@ -88,6 +104,17 @@ source /home/tom/OpenViking/.venv/bin/activate openviking-server --host 127.0.0.1 --port 1933 ``` +启动本机 EverMemOS 服务: + +```bash +cd /home/tom/memory-gateway +source /home/tom/OpenViking/.venv/bin/activate +python -m memory_gateway.evermemos_service \ + --config /home/tom/memory-gateway/config.yaml \ + --host 127.0.0.1 \ + --port 1995 +``` + 启动 Memory Gateway: ```bash @@ -100,6 +127,8 @@ python -m memory_gateway.server --config /home/tom/memory-gateway/config.yaml ```bash curl http://127.0.0.1:1934/health +curl http://127.0.0.1:1995/health +curl http://127.0.0.1:1934/v1/evermemos/health ``` ## REST 接口 @@ -189,6 +218,103 @@ curl -X POST http://127.0.0.1:1934/api/knowledge/upload \ obsidian-vault/01_Knowledge/Uploaded/ ``` +## v1 通用 Memory API + +v1 API 面向多 agent 框架,带 user / agent / workspace / session 上下文和基础 ACL。 + +### 创建用户 + +```bash +curl -X POST http://127.0.0.1:1934/v1/users \ + -H "Content-Type: application/json" \ + -d '{"user_id":"user_tom","display_name":"Tom","preferences":{"language":"zh-CN"}}' +``` + +### 写入记忆 + +```bash +curl -X POST http://127.0.0.1:1934/v1/memory \ + -H "Content-Type: application/json" \ + -d '{ + "user_id": "user_tom", + "agent_id": "agent_hermes", + "workspace_id": "ws_memory_gateway", + "memory_type": "preference", + "content": "用户偏好中文输出,结构化但不要过度工程化。", + "summary": "中文、结构化、轻量 POC 优先。", + "tags": ["preference", "style"], + "importance": 0.8, + "confidence": 0.9, + "visibility": "private", + "source": "manual" + }' +``` + +### 检索记忆 + +```bash +curl -X POST http://127.0.0.1:1934/v1/memory/search \ + -H "Content-Type: application/json" \ + -d '{ + "user_id": "user_tom", + "agent_id": "agent_hermes", + "workspace_id": "ws_memory_gateway", + "query": "中文输出", + "limit": 5 + }' +``` + +返回会包含: + +- `local_total`:SQLite metadata 命中的记忆数量。 +- `openviking_total`:按可见 namespace 查询 OpenViking 的命中数量。 +- `searched_namespaces`:Gateway 展开并允许查询的 namespace。 + +### 修改记忆 + +```bash +curl -X PATCH "http://127.0.0.1:1934/v1/memory/MEMORY_ID?user_id=user_tom&agent_id=agent_hermes&workspace_id=ws_memory_gateway" \ + -H "Content-Type: application/json" \ + -d '{"summary":"用户偏好中文、结构化、少废话。","importance":0.9}' +``` + +### 写入 episode 并 commit session + +```bash +curl -X POST http://127.0.0.1:1934/v1/episodes \ + -H "Content-Type: application/json" \ + -d '{ + "user_id": "user_tom", + "agent_id": "agent_hermes", + "workspace_id": "ws_memory_gateway", + "session_id": "sess_demo", + "content": "结论:这个项目必须保留用户隔离和 namespace ACL。", + "tags": ["decision"] + }' + +curl -X POST http://127.0.0.1:1934/v1/sessions/sess_demo/commit \ + -H "Content-Type: application/json" \ + -d '{ + "user_id": "user_tom", + "agent_id": "agent_hermes", + "workspace_id": "ws_memory_gateway", + "session_id": "sess_demo", + "promote": true, + "min_importance": 0.6 + }' +``` + +流程说明: + +- 短期记忆先写入 SQLite 的 `episodes`,namespace 通常是 `session/{session_id}/episodic`。 +- commit session 时,Gateway 把当前 session episodes、可见长期记忆和访问上下文发给 `http://127.0.0.1:1995/v1/sessions/consolidate`。 +- EverMemOS 返回候选记忆、可直接提升的长期记忆、重复/冲突信息和 review draft 路径。 +- Gateway 只把正常稳定候选写入长期 memory;高价值或冲突候选不会直接进入长期记忆,会写入: + +```text +obsidian-vault/Reviews/Queue/ +``` + ## MCP Tools `POST /mcp/rpc` 支持: @@ -200,6 +326,14 @@ obsidian-vault/01_Knowledge/Uploaded/ - `get_status` - `list_memories` - `list_resources` +- `memory_search` +- `memory_upsert` +- `memory_append_episode` +- `memory_commit_session` +- `memory_get_profile` +- `memory_list_namespaces` +- `memory_delete` +- `memory_feedback` ## Hermes Skill @@ -218,12 +352,24 @@ integrations/hermes/memory-gateway/ 主要脚本: ```text +scripts/evermemos_health.py +scripts/memory_create_user.py +scripts/memory_append_episode.py +scripts/memory_commit_session.py +scripts/memory_search.py +scripts/memory_upsert.py scripts/retrieve_memory.py scripts/commit_summary.py scripts/upload_knowledge.py scripts/search_obsidian.py ``` +检查 EverMemOS: + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/evermemos_health.py +``` + 检索记忆: ```bash @@ -257,6 +403,47 @@ python /home/tom/.hermes/skills/memory-gateway/scripts/upload_knowledge.py \ --persist-as resource ``` +完整长短期记忆测试: + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_create_user.py \ + --user-id user_tom \ + --display-name "Tom" \ + --preference language=zh-CN + +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_append_episode.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo \ + --tag decision \ + --text "结论:本机 EverMemOS 服务负责从 session episode 中整理稳定长期记忆。" + +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_append_episode.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo \ + --tag review \ + --tag high-value \ + --text "重要:高价值记忆应该进入 Obsidian review queue,避免错误记忆污染长期系统。" + +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_commit_session.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo \ + --min-importance 0.6 + +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_search.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo \ + --query "EverMemOS 服务负责" \ + --limit 5 +``` + ## 测试 ```bash diff --git a/config.example.yaml b/config.example.yaml index e42d1a3..f7445e6 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -19,6 +19,18 @@ openviking: # 请求超时时间(秒) timeout: 30 +# EverMemOS 后台长期记忆整理服务 +evermemos: + enabled: true + url: "http://127.0.0.1:1995" + api_key: "" + timeout: 30 + health_path: "/health" + # 如果远端服务实际 endpoint 不同,改这里即可,不需要改代码。 + consolidate_path: "/v1/sessions/consolidate" + # POC 默认允许远端不可用时用本地确定性 worker 降级,方便开发测试。 + fallback_to_local: true + # 记忆配置 memory: # 默认命名空间 @@ -44,3 +56,9 @@ llm: obsidian: vault_path: "/home/tom/memory-gateway/obsidian-vault" knowledge_dir: "01_Knowledge/Uploaded" + review_dir: "Reviews/Queue" + +# v1 metadata storage. Use "memory" only for isolated unit tests. +storage: + backend: "sqlite" + sqlite_path: "/home/tom/memory-gateway/memory_gateway.sqlite3" diff --git a/docs/generic-memory-gateway-design.md b/docs/generic-memory-gateway-design.md new file mode 100644 index 0000000..9a0955c --- /dev/null +++ b/docs/generic-memory-gateway-design.md @@ -0,0 +1,767 @@ +# 通用 Memory Gateway 方案与 POC 骨架 + +本文基于当前仓库的轻量 FastAPI + MCP + OpenViking + Obsidian 能力扩展,不把系统设计成重平台。第一阶段目标是先跑通多用户隔离、namespace routing、记忆检索、写入、session commit 和人工 review 草稿,后续再替换持久化、向量索引和 EverMemOS worker。 + +## A. 总体架构图 + +```mermaid +flowchart TB + subgraph Agents["Agent Frameworks"] + Nanobot[Nanobot] + Hermes[Hermes Agent] + OpenClaw[OpenClaw] + Other[Other Agents] + end + + subgraph Gateway["Memory Gateway"] + HTTP[HTTP API /v1] + MCP[MCP tools] + Auth[Auth / API Key / Future Login] + ACL[ACL & Visibility Policy] + Router[Namespace Router] + Audit[Audit Log] + Retrieval[Retrieval Orchestrator] + Writeback[Writeback Orchestrator] + end + + subgraph Skills["Skills Layer"] + Ingest[ingest] + Extract[extract] + Classify[classify] + Retrieve[retrieve] + Commit[commit] + Merge[merge] + Prune[prune] + Summarize[summarize] + end + + subgraph OpenViking["OpenViking"] + OVFS[context filesystem] + OVMem[memory] + OVRes[resources] + OVSkills[skills] + OVWorkspace[workspace] + end + + subgraph EverMemOS["EverMemOS"] + LTE[long-term extraction] + Consolidation[consolidation] + Decay[decay] + Dedup[dedup] + Profile[profile evolution] + end + + subgraph Obsidian["Obsidian"] + Vault[human editable memory vault] + Reviews[review queue] + Profiles[profiles] + LongTerm[long-term notes] + end + + subgraph Storage["Storage"] + DB[(metadata DB)] + Vector[(vector index)] + Files[(object / file storage)] + end + + Nanobot --> HTTP + Hermes --> MCP + OpenClaw --> HTTP + Other --> HTTP + Other --> MCP + + HTTP --> Auth --> ACL --> Router + MCP --> Auth + Router --> Retrieval + Router --> Writeback + ACL --> Audit + + Retrieval --> Skills + Writeback --> Skills + Skills --> OpenViking + Skills --> EverMemOS + Skills --> Obsidian + + Gateway --> DB + Gateway --> Vector + Gateway --> Files + OpenViking --> DB + OpenViking --> Vector + Obsidian --> Files + EverMemOS --> DB + EverMemOS --> Vector +``` + +## B. 核心数据模型 + +代码骨架见 `memory_gateway/schemas.py`。核心模型如下。 + +### User + +```json +{ + "id": "user_tom", + "display_name": "Tom", + "status": "active", + "profile_namespace": "user/user_tom/profile", + "preferences": {"language": "zh-CN"}, + "created_at": "2026-04-30T10:00:00Z", + "updated_at": "2026-04-30T10:00:00Z" +} +``` + +### Agent + +```json +{ + "id": "agent_hermes_default", + "name": "Hermes Default Agent", + "framework": "hermes", + "owner_user_id": "user_tom", + "created_at": "2026-04-30T10:00:00Z" +} +``` + +### Workspace + +```json +{ + "id": "ws_memory_gateway", + "name": "Memory Gateway POC", + "owner_user_id": "user_tom", + "member_user_ids": ["user_tom"], + "allowed_agent_ids": ["agent_hermes_default"] +} +``` + +### Session + +```json +{ + "id": "sess_20260430_001", + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "status": "open", + "expires_at": "2026-05-07T10:00:00Z" +} +``` + +### MemoryRecord + +```json +{ + "id": "mem_abc123", + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "session_id": "sess_20260430_001", + "namespace": "user/user_tom/long_term", + "memory_type": "preference", + "content": "用户偏好中文输出,结构化但不要过度平台化。", + "summary": "中文、结构化、轻量 POC 优先。", + "tags": ["preference", "style"], + "importance": 0.8, + "confidence": 0.9, + "visibility": "private", + "source": "conversation", + "created_at": "2026-04-30T10:00:00Z", + "updated_at": "2026-04-30T10:00:00Z", + "expires_at": null, + "version": 1 +} +``` + +### EpisodeRecord + +短期过程记录,默认不进入 Obsidian,不自动成为长期记忆。 + +```json +{ + "id": "epi_abc123", + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "session_id": "sess_20260430_001", + "namespace": "session/sess_20260430_001/episodic", + "content": "本轮讨论了 Memory Gateway POC 范围。", + "summary": "确认 POC 优先做隔离、检索、写入和整理。", + "events": [], + "tags": ["design"] +} +``` + +### ProfileRecord + +```json +{ + "id": "profile_user_tom", + "user_id": "user_tom", + "namespace": "user/user_tom/profile", + "display_name": "Tom", + "stable_facts": ["正在设计通用 Memory Gateway"], + "preferences": {"language": "Chinese"}, + "working_style": ["偏好可落地 POC"], + "updated_from_memory_ids": ["mem_abc123"], + "version": 3 +} +``` + +### ACL / Visibility + +`visibility` 四档: + +- `private`:仅 `user_id` 相同可读写。 +- `agent-only`:同一 `user_id` 且同一 `agent_id` 可读写。 +- `workspace-shared`:在同一 `workspace_id` 且通过 workspace membership 授权后可读。 +- `global`:可公开检索,只能由受信任 actor 写入。 + +### AuditLog + +```json +{ + "id": "audit_abc123", + "actor_user_id": "user_tom", + "actor_agent_id": "agent_hermes_default", + "action": "memory_search", + "target_type": "memory", + "target_id": "mem_abc123", + "namespace": "user/user_tom/long_term", + "decision": "allow", + "reason": "private owner", + "created_at": "2026-04-30T10:00:00Z" +} +``` + +## C. Namespace 与隔离设计 + +推荐 namespace: + +```text +user/{user_id}/profile +user/{user_id}/preferences +user/{user_id}/long_term +agent/{agent_id}/memory +workspace/{workspace_id}/shared +session/{session_id}/episodic +global/public +``` + +隔离规则: + +- 用户隔离:所有 `user/{user_id}/...` 默认只允许同一 `user_id` 访问。Gateway 先校验 actor,再把 namespace 映射到 OpenViking URI。 +- Agent 隔离:`agent/{agent_id}/memory` 用于某个 agent 的工具经验、失败教训、prompt working notes。默认 `agent-only`。 +- Workspace 共享:`workspace/{workspace_id}/shared` 必须检查用户是否属于 workspace,agent 是否在 `allowed_agent_ids` 内。 +- Session 过期:`session/{session_id}/episodic` 必须有 TTL。过期后不可检索;只保留必要 audit。 +- 可跨 agent 共享:用户显式确认的 profile、preferences、user long_term、workspace shared、global public。 +- 不可跨 agent 共享:agent-only memory、未 commit 的 session episodic、低置信度候选记忆、含敏感凭据或临时日志的内容。 + +OpenViking URI 映射: + +```text +viking://memory/user/{user_id}/long_term/{memory_id}.json +viking://resources/workspace/{workspace_id}/shared/{slug}.md +viking://skills/memory-gateway/{skill_name} +``` + +## D. API 设计 + +第一阶段代码已挂载 `/v1` router,见 `memory_gateway/api_v1.py`。 + +### POST /v1/users + +Request: + +```json +{"user_id": "user_tom", "display_name": "Tom", "preferences": {"language": "zh-CN"}} +``` + +Response: + +```json +{"id": "user_tom", "display_name": "Tom", "profile_namespace": "user/user_tom/profile", "status": "active"} +``` + +### GET /v1/users/{user_id} + +Response: + +```json +{"id": "user_tom", "display_name": "Tom", "status": "active"} +``` + +### POST /v1/memory/search + +Request: + +```json +{ + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "query": "中文输出偏好", + "namespaces": ["user/user_tom/long_term"], + "limit": 5 +} +``` + +Response: + +```json +{ + "results": [ + { + "memory": { + "id": "mem_abc123", + "namespace": "user/user_tom/long_term", + "summary": "中文、结构化、轻量 POC 优先。" + }, + "score": 2.7 + } + ], + "total": 1 +} +``` + +### POST /v1/memory + +Request: + +```json +{ + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "memory_type": "preference", + "content": "用户偏好中文输出。", + "summary": "中文输出偏好", + "tags": ["preference"], + "importance": 0.8, + "confidence": 0.9, + "visibility": "private", + "source": "manual" +} +``` + +Response: + +```json +{"id": "mem_abc123", "namespace": "user/user_tom/long_term", "version": 1} +``` + +### GET /v1/memory/{memory_id} + +Request query: + +```text +?user_id=user_tom&agent_id=agent_hermes_default&workspace_id=ws_memory_gateway +``` + +Response: + +```json +{"id": "mem_abc123", "content": "用户偏好中文输出。", "visibility": "private"} +``` + +### PATCH /v1/memory/{memory_id} + +Request: + +```json +{"summary": "用户偏好中文、结构化、少废话。", "importance": 0.9} +``` + +Response: + +```json +{"id": "mem_abc123", "version": 2, "importance": 0.9} +``` + +### DELETE /v1/memory/{memory_id} + +Response: + +```json +{"deleted": true, "id": "mem_abc123"} +``` + +### POST /v1/episodes + +Request: + +```json +{ + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "session_id": "sess_001", + "content": "本轮完成了 namespace 和 ACL 设计。", + "tags": ["design"] +} +``` + +Response: + +```json +{"id": "epi_abc123", "namespace": "session/sess_001/episodic"} +``` + +### POST /v1/sessions/{session_id}/commit + +Request: + +```json +{ + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "promote": true, + "min_importance": 0.6, + "target_namespace": "user/user_tom/long_term" +} +``` + +Response: + +```json +{"session_id": "sess_001", "episodes": 3, "promoted": [{"id": "mem_def456"}]} +``` + +### GET /v1/users/{user_id}/profile + +Response: + +```json +{"user_id": "user_tom", "namespace": "user/user_tom/profile", "preferences": {"language": "zh-CN"}} +``` + +### POST /v1/memory/{memory_id}/feedback + +Request: + +```json +{"user_id": "user_tom", "feedback": "incorrect", "comment": "这是一次临时偏好,不应长期保留。"} +``` + +Response: + +```json +{"status": "ok", "memory_id": "mem_abc123", "feedback": "incorrect"} +``` + +### GET /v1/namespaces + +Request query: + +```text +?user_id=user_tom&agent_id=agent_hermes_default&workspace_id=ws_memory_gateway&session_id=sess_001 +``` + +Response: + +```json +[ + {"namespace": "user/user_tom/profile", "visibility": "private"}, + {"namespace": "agent/agent_hermes_default/memory", "visibility": "agent-only"}, + {"namespace": "workspace/ws_memory_gateway/shared", "visibility": "workspace-shared"} +] +``` + +### GET /v1/audit + +Response: + +```json +[{"action": "upsert_memory", "target_type": "memory", "decision": "allow"}] +``` + +### MCP tools + +目标 v1 tools 见 `memory_gateway/mcp_tools_v1.py`: + +- `memory_search` +- `memory_upsert` +- `memory_append_episode` +- `memory_commit_session` +- `memory_get_profile` +- `memory_list_namespaces` +- `memory_delete` +- `memory_feedback` + +示例 MCP call: + +```json +{ + "name": "memory_search", + "arguments": { + "user_id": "user_tom", + "agent_id": "agent_hermes_default", + "workspace_id": "ws_memory_gateway", + "query": "项目 POC 决策", + "limit": 5 + } +} +``` + +## E. Skills 设计 + +代码骨架位于 `memory_gateway/skills/`。 + +| Skill | 功能 | 输入 | 输出 | 触发时机 | 组件 | 写长期记忆 | +|---|---|---|---|---|---|---| +| `ingest_skill` | 标准化对话、文件、任务事件 | raw text/file/events | normalized payload | agent 写入 episode 前 | Gateway, file storage | 否 | +| `extract_memory_skill` | 从 episode/session 抽取候选记忆 | episode/session content | memory candidates | session commit / worker 定时 | LLM, EverMemOS | 否 | +| `classify_memory_skill` | 判断 memory_type、visibility、namespace | candidate memory | classification | 写入前 | ACL, namespace router | 否 | +| `retrieve_context_skill` | 聚合用户、agent、workspace 上下文 | query + context ids | ranked contexts | agent 调用前 | OpenViking, vector index | 否 | +| `commit_memory_skill` | 写入长期记忆 | MemoryRecord | stored record | 人工确认或 commit 通过 | DB, OpenViking | 是 | +| `summarize_episode_skill` | 压缩 episode | episode content | summary | session commit | LLM | 否 | +| `merge_memory_skill` | 合并重复或相近记忆 | memory ids | merged memory | EverMemOS 整理 | DB, vector index | 是 | +| `prune_memory_skill` | 衰减、归档、删除低质记忆 | policy + memory ids | archived/deleted list | 定时 worker | EverMemOS | 是 | +| `export_to_obsidian_skill` | 生成 Obsidian review draft | high-value memory | markdown draft | 高价值或需人工确认 | Obsidian | 否 | +| `import_from_obsidian_skill` | 从人工维护笔记导入记忆 | markdown path | MemoryRecord | vault sync | Obsidian, OpenViking | 是 | + +## F. Obsidian Vault 设计 + +推荐目录: + +```text +obsidian-vault/ +├── Users/ +│ └── {user_id}/ +│ ├── Profile.md +│ ├── Preferences.md +│ └── LongTerm/ +├── Agents/ +│ └── {agent_id}/Experience.md +├── Workspaces/ +│ └── {workspace_id}/Shared.md +├── Memories/ +│ ├── LongTerm/ +│ └── Archived/ +├── Profiles/ +├── Reviews/ +│ ├── Queue/ +│ ├── Accepted/ +│ └── Rejected/ +├── Exports/ +└── Templates/ +``` + +进入 Obsidian 的内容: + +- 人工可维护 profile、preferences、长期总结。 +- 高价值 workspace 知识、项目决策、复用经验。 +- EverMemOS 标记为 `needs_review` 的长期记忆草稿。 + +不进入 Obsidian 的内容: + +- 全量原始对话。 +- 高频工具日志、临时 session trace。 +- 低置信度候选记忆。 +- 敏感凭据、token、临时错误栈。 + +标签体系: + +```text +#memory/profile +#memory/preference +#memory/long-term +#memory/workspace +#memory/agent-experience +#memory/review +#memory/conflict +#memory/deprecated +#source/evermemos +#source/manual +#visibility/private +#visibility/workspace-shared +``` + +模板文件已加入 `obsidian-vault/05_Templates/`。 + +## G. OpenViking 设计 + +OpenViking 作为统一 context 层,Gateway 不要求 agent 直接理解 OpenViking 内部结构。 + +组织方式: + +```text +viking://memory/user/{user_id}/profile +viking://memory/user/{user_id}/preferences +viking://memory/user/{user_id}/long_term +viking://memory/agent/{agent_id}/memory +viking://memory/workspace/{workspace_id}/shared +viking://resources/user/{user_id}/obsidian/{note_id}.md +viking://skills/memory-gateway/{skill_name} +``` + +检索路径: + +1. Agent 调用 Gateway `/v1/memory/search` 或 MCP `memory_search`。 +2. Gateway 执行 Auth、ACL、namespace expansion。 +3. Gateway 查询 metadata DB 和 vector index,必要时调用 OpenViking search。 +4. 返回统一 `MemoryRecord` 或 context chunk,不暴露底层差异。 + +同步: + +- Obsidian accepted note 通过 `import_from_obsidian_skill` 写回 Gateway,再同步 OpenViking resource。 +- EverMemOS consolidation 后写入 `user/{user_id}/long_term` 或 `workspace/{workspace_id}/shared`。 +- Gateway 保存 `source_ref`,避免 OpenViking 与 Obsidian 互相重复导入。 + +## H. EverMemOS 设计 + +输入来源: + +- `EpisodeRecord`:对话片段、任务执行摘要、agent 过程事件。 +- `SessionRecord`:session commit 包。 +- `MemoryFeedback`:incorrect、duplicate、outdated 等反馈。 +- Obsidian review 结果:accepted/rejected/edited。 + +整理流程: + +1. 抽取:从 episode 中提炼候选事实、偏好、决策、经验。 +2. 打分:根据重要性、稳定性、重复出现次数、来源可信度打分。 +3. 去重:按 semantic hash + embedding 相似度查找近似 MemoryRecord。 +4. 合并:相同事实合并 evidence;更高置信度覆盖低置信度。 +5. 冲突检测:同一 subject 的相反陈述标记 `needs_review`,不自动覆盖。 +6. 衰减:长时间未命中且低反馈的记忆降低 importance。 +7. 归档:过期、错误、低置信度、被人工拒绝的记忆转 archived。 +8. profile evolution:只有稳定、重复、高置信偏好进入 ProfileRecord。 + +污染控制: + +- session 临时内容不直接提升为长期记忆。 +- LLM 抽取结果默认是 candidate,需阈值或人工确认。 +- 每条长期记忆保留 source、confidence、version、feedback。 +- 对 profile 更新采用 evidence count,禁止一次对话永久改写强偏好。 + +## I. 工程目录结构 + +当前仓库保留 `memory_gateway/` 包名,目标结构如下: + +```text +memory-gateway/ +├── memory_gateway/ +│ ├── api_v1.py # v1 HTTP API +│ ├── mcp_tools_v1.py # v1 MCP tool contract +│ ├── schemas.py # User/Memory/Episode/Profile/ACL/Audit +│ ├── namespace.py # namespace builder + ACL helpers +│ ├── services.py # orchestration service +│ ├── repositories.py # POC in-memory repo; later DB repo +│ ├── security/ # future auth, RBAC, audit policy +│ ├── skills/ +│ │ ├── ingest_skill.py +│ │ ├── extract_memory_skill.py +│ │ ├── classify_memory_skill.py +│ │ ├── retrieve_context_skill.py +│ │ ├── commit_memory_skill.py +│ │ ├── summarize_episode_skill.py +│ │ ├── merge_memory_skill.py +│ │ ├── prune_memory_skill.py +│ │ ├── export_to_obsidian_skill.py +│ │ └── import_from_obsidian_skill.py +│ ├── adapters/ +│ │ ├── openviking.py +│ │ ├── evermemos.py +│ │ └── obsidian.py +│ └── workers/ +│ └── evermemos_worker.py +├── obsidian-vault/ +├── integrations/ +│ ├── nanobot/ +│ ├── hermes/ +│ └── openclaw/ +└── tests/ +``` + +如果未来迁移到更标准的 `app/`,可把 `memory_gateway/api_v1.py` 对应到 `app/api`,`schemas.py` 对应到 `app/schemas`,`services.py` 对应到 `app/services`。 + +## J. 2 到 4 周 POC 实施计划 + +第一周: + +- 完成 `/v1/users`、`/v1/memory`、`/v1/memory/search`、`/v1/episodes`。 +- 实现 namespace router、visibility、基础 audit。 +- 存储先用 SQLite 或当前内存 repo,搜索先用 lexical,OpenViking 作为可选后端。 + +第二周: + +- 接入 OpenViking URI 写入和检索。 +- 实现 `retrieve_context_skill`、`commit_memory_skill`、`summarize_episode_skill`。 +- 给 Hermes/Nanobot/OpenClaw 提供最小 client 示例。 + +第三周: + +- 加 EverMemOS worker 原型:session commit、candidate extraction、dedup、merge。 +- 增加 feedback 流程:incorrect、duplicate、outdated 影响 prune/merge。 +- 生成 Obsidian review draft,而不是直接写入最终知识库。 + +第四周: + +- Obsidian import/export 双向同步。 +- 增加 profile evolution 的阈值和 evidence 机制。 +- 补充权限测试、污染测试、重复记忆测试、跨 agent 检索测试。 + +先做: + +- 用户隔离、namespace、memory CRUD、episode append、session commit、basic search、audit。 + +暂不做: + +- 完整登录系统、复杂 RBAC、多租户计费、实时同步、复杂 UI、全量向量数据库治理。 + +POC 成功指标: + +- 不同 `user_id` 之间无法互相读写 private memory。 +- 同一 workspace 的共享记忆可被授权 agent 检索。 +- session 记忆不会自动污染长期记忆。 +- 10 条重复候选能合并到 1 到 2 条长期记忆。 +- 错误反馈后,该记忆不再进入默认 retrieval。 +- Hermes/Nanobot/OpenClaw 至少两个框架能通过统一 API 调用。 + +## K. 推荐默认方案 + +第一阶段最合理默认方案: + +- FastAPI 提供 `/v1` 统一 HTTP API。 +- MCP 先保留现有 `/mcp/rpc`,新增 `memory_gateway/mcp_tools_v1.py` 作为目标 contract。 +- 存储使用 SQLite metadata + 本地文件存 object;当前代码先用 in-memory repo 验证接口。 +- 搜索先用 OpenViking search + 简单 lexical fallback;向量索引第二阶段引入。 +- Obsidian 只保存人工可读的高价值长期记忆和 review draft。 +- EverMemOS 第一阶段不做独立大系统,只做 worker 模块:extract、dedup、merge、prune、profile update。 + +第一阶段实现 API: + +- `POST /v1/users` +- `GET /v1/users/{user_id}` +- `POST /v1/memory/search` +- `POST /v1/memory` +- `GET /v1/memory/{memory_id}` +- `POST /v1/episodes` +- `POST /v1/sessions/{session_id}/commit` +- `GET /v1/users/{user_id}/profile` +- `GET /v1/namespaces` + +第一阶段实现 skills: + +- `ingest_skill` +- `summarize_episode_skill` +- `retrieve_context_skill` +- `commit_memory_skill` +- `export_to_obsidian_skill` + +第二阶段再补: + +- `extract_memory_skill` +- `classify_memory_skill` +- `merge_memory_skill` +- `prune_memory_skill` +- `import_from_obsidian_skill` +- 更完整的 EverMemOS consolidation 和 profile evolution。 + +角色分工: + +- Obsidian 第一阶段:review draft、人类确认 profile/长期知识。第二阶段:双向同步。 +- OpenViking 第一阶段:统一 context/resource 检索入口。第二阶段:承载多 namespace context filesystem 和 skill registry。 +- EverMemOS 第一阶段:session commit worker。第二阶段:长期记忆治理、衰减、冲突检测、profile evolution。 + diff --git a/integrations/hermes/memory-gateway/SKILL.md b/integrations/hermes/memory-gateway/SKILL.md index 9e6e3a6..e7974ed 100644 --- a/integrations/hermes/memory-gateway/SKILL.md +++ b/integrations/hermes/memory-gateway/SKILL.md @@ -1,54 +1,211 @@ --- name: memory-gateway -description: Use this skill when an agent or harness needs reusable memory: search prior context, retrieve OpenViking resources, upload documents into knowledge, summarize arbitrary content with the Memory Gateway LLM, commit final conclusions, or cite related Obsidian notes. This skill is domain-neutral. -version: 2.0.0 +description: Use this skill when Hermes needs shared long-term memory, user-scoped preferences/profile, workspace memory, session episode capture, Memory Gateway retrieval, OpenViking context search, Obsidian document upload/review, or session commit through the standalone EverMemOS service. This skill is domain-neutral. +version: 3.1.0 metadata: hermes: - tags: [memory, openviking, obsidian, knowledge, retrieval, summarization, document-ingestion, agent-context] + tags: [memory, memory-gateway, openviking, obsidian, evermemos, long-term-memory, retrieval, agent-context] --- # Memory Gateway -Use this skill as a generic memory layer for any agent / harness. It connects Hermes to the local Memory Gateway at `http://127.0.0.1:1934`, which fronts OpenViking and an Obsidian vault. +Use this skill as Hermes' generic memory layer. It connects Hermes to the local Memory Gateway at `http://127.0.0.1:1934`. -## Trigger Rule +The gateway provides: -Use this skill when the user asks to: -- search prior memory or retrieve related context -- upload a document and make it reusable knowledge -- summarize content and store it as memory/resource -- commit final conclusions, decisions, lessons learned, or research notes -- cite related OpenViking resources or Obsidian notes -- prepare context for another agent or workflow - -Do not assume any domain-specific workflow. Treat Memory Gateway as a reusable memory and knowledge entrypoint. +- v1 user/agent/workspace/session aware memory APIs backed by SQLite metadata. +- ACL and namespace routing before retrieval. +- OpenViking fan-out search for visible namespaces. +- Session episode capture and commit through the standalone EverMemOS HTTP service, with Gateway local fallback only when configured. +- Obsidian review drafts for high-value or conflicting long-term memory candidates. +- Legacy summary/document upload endpoints for LLM summarization and Obsidian knowledge ingestion. ## Environment Defaults: + - Memory Gateway URL: `http://127.0.0.1:1934` +- EverMemOS URL through Gateway config: `http://127.0.0.1:1995` - Obsidian vault: `/home/tom/memory-gateway/obsidian-vault` -- Default namespace: `memory-gateway` +- Default review queue: `/home/tom/memory-gateway/obsidian-vault/Reviews/Queue` Optional env vars: + - `MEMORY_GATEWAY_URL` - `MEMORY_GATEWAY_API_KEY` - `MEMORY_GATEWAY_OBSIDIAN_VAULT` -## Core Workflows +## Recommended Hermes Workflow -### 1. Retrieve Context +For normal agent work: + +1. Search memory before answering if prior context may matter. +2. Append important session episodes while working. +3. Commit the session at the end so EverMemOS can promote stable memories. +4. Use feedback to mark incorrect, duplicate, outdated, or useful memories. +5. Upload documents only when they are reusable knowledge, not raw noisy logs. + +Do not write full transcripts to long-term memory. Use episodes for temporary process capture and commit only stable conclusions. + +## v1 Memory Commands + +### Check EverMemOS ```bash -python /home/tom/.hermes/skills/memory-gateway/scripts/retrieve_memory.py \ - --query "project decision memory gateway LLM summary" \ - --uri viking://resources \ +python /home/tom/.hermes/skills/memory-gateway/scripts/evermemos_health.py +``` + +Expected healthy response includes `status: ok` and `response.service: evermemos-local`. + +### Create User + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_create_user.py \ + --user-id user_tom \ + --display-name "Tom" \ + --preference language=zh-CN +``` + +### Search ACL-Aware Memory + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_search.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --query "namespace ACL decision" \ --limit 5 ``` -Use retrieval before answering when prior context may materially improve correctness. +Equivalent backward-compatible command: -### 2. Summarize And Commit +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/retrieve_memory.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --query "namespace ACL decision" \ + --limit 5 +``` + +If `retrieve_memory.py` is called without `--user-id`, it falls back to the legacy `/api/search` endpoint. + +### Upsert Long-Term Memory + +Use this only for stable, concise, reusable memory. + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_upsert.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --memory-type preference \ + --visibility private \ + --importance 0.8 \ + --confidence 0.9 \ + --tag preference \ + --summary "中文、结构化、轻量 POC 优先" \ + --text "用户偏好中文输出,结构化但不要过度工程化。" +``` + +### Append Session Episode + +Use this during a task to record useful process notes without immediately polluting long-term memory. + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_append_episode.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo \ + --tag decision \ + --text "结论:这个项目必须保留用户隔离和 namespace ACL。" +``` + +### Commit Session Through EverMemOS + +This asks Memory Gateway to call the standalone EverMemOS service configured in `config.yaml`. +For local POC the default service is `http://127.0.0.1:1995`. If `evermemos.fallback_to_local` is true and the service is unavailable, Gateway returns `evermemos_backend: local-fallback`. + +- extracts candidate memories from session episodes +- deduplicates exact repeated candidates +- detects simple conflicts +- promotes normal stable memories into SQLite long-term memory +- sends high-value or conflicting candidates to Obsidian review drafts + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_commit_session.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo \ + --min-importance 0.6 +``` + +Review drafts are written under: + +```text +/home/tom/memory-gateway/obsidian-vault/Reviews/Queue/ +``` + +### Get Profile + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_get_profile.py \ + --user-id user_tom +``` + +### List Visible Namespaces + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_list_namespaces.py \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --session-id sess_demo +``` + +### Patch Memory + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_patch.py \ + --memory-id mem_xxx \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --summary "用户偏好中文、结构化、少废话。" \ + --importance 0.9 \ + --tag preference \ + --tag confirmed +``` + +### Feedback + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_feedback.py \ + --memory-id mem_xxx \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway \ + --feedback incorrect \ + --comment "这是临时偏好,不应长期保留。" +``` + +### Delete Memory + +```bash +python /home/tom/.hermes/skills/memory-gateway/scripts/memory_delete.py \ + --memory-id mem_xxx \ + --user-id user_tom \ + --agent-id agent_hermes \ + --workspace-id ws_memory_gateway +``` + +## Knowledge And Obsidian Commands + +### Summarize And Commit Via Legacy LLM Endpoint + +Use this for high-value text that should become an OpenViking resource or summarized memory. ```bash python /home/tom/.hermes/skills/memory-gateway/scripts/commit_summary.py \ @@ -60,9 +217,9 @@ python /home/tom/.hermes/skills/memory-gateway/scripts/commit_summary.py \ --text "" ``` -This calls `POST /api/summary`, which uses the configured LLM and writes to OpenViking when `persist-as` is not `none`. +This calls `POST /api/summary`. -### 3. Upload Document As Knowledge +### Upload Document As Knowledge ```bash python /home/tom/.hermes/skills/memory-gateway/scripts/upload_knowledge.py \ @@ -76,7 +233,7 @@ python /home/tom/.hermes/skills/memory-gateway/scripts/upload_knowledge.py \ This calls `POST /api/knowledge/upload`: document -> MarkItDown Markdown -> Obsidian note -> LLM summary -> OpenViking resource. -### 4. Search Obsidian Notes +### Search Local Obsidian Notes ```bash python /home/tom/.hermes/skills/memory-gateway/scripts/search_obsidian.py \ @@ -84,6 +241,21 @@ python /home/tom/.hermes/skills/memory-gateway/scripts/search_obsidian.py \ --limit 5 ``` +## MCP Tool Names + +The gateway also exposes these v1 tools through `/mcp/rpc`: + +- `memory_search` +- `memory_upsert` +- `memory_append_episode` +- `memory_commit_session` +- `memory_get_profile` +- `memory_list_namespaces` +- `memory_delete` +- `memory_feedback` + +Use MCP tools when Hermes has an MCP bridge available. Use the scripts above when Hermes runs skills as shell commands. + ## Output Template When using this skill, answer with: @@ -92,25 +264,27 @@ When using this skill, answer with: ## Answer -## Memory / Resource References -- `` — `<viking://...>` — why it matters +## Memory References +- `<memory_id or URI>` — `<namespace>` — why it matters -## Obsidian References -- `<note.md>` — `<relative path>` — why it matters +## Obsidian Review +- `<draft path>` — why it needs review -## Suggested Memory Commit -- commit: yes/no -- namespace: -- memory_type: -- tags: -- resource_uri: if committed +## Memory Action +- searched: yes/no +- appended_episode: yes/no +- committed_session: yes/no +- promoted_memory_count: +- review_draft_count: ``` ## Guardrails -- Do not store raw noisy data as long-term memory when a concise summary is enough. -- Prefer LLM summaries and structured artifacts over full chat transcripts. +- Do not store raw noisy data as long-term memory. +- Use `memory_append_episode.py` for temporary process notes. +- Use `memory_commit_session.py` at task end to let EverMemOS decide what should persist. +- Use `memory_upsert.py` directly only for stable, concise, user-approved memory. - Do not commit secrets, credentials, tokens, private keys, or unnecessary personal data. - If content is sensitive, summarize and redact before committing. -- If retrieval quality looks noisy, state that and cite only useful results. -- Always report whether a commit/upload actually succeeded and include the returned resource URI when available. +- High-value or conflicting candidates should go to Obsidian review drafts before becoming durable memory. +- Always report whether retrieval, episode append, session commit, or upload actually succeeded. diff --git a/integrations/hermes/memory-gateway/scripts/_client.py b/integrations/hermes/memory-gateway/scripts/_client.py index 082f792..b346ae2 100755 --- a/integrations/hermes/memory-gateway/scripts/_client.py +++ b/integrations/hermes/memory-gateway/scripts/_client.py @@ -2,6 +2,7 @@ from __future__ import annotations import json import os +import urllib.parse import urllib.request from typing import Any @@ -17,3 +18,35 @@ def post_json(path: str, payload: dict[str, Any], gateway_url: str = DEFAULT_GAT req.add_header("X-API-Key", api_key) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) + + +def get_json(path: str, params: dict[str, Any] | None = None, gateway_url: str = DEFAULT_GATEWAY_URL, api_key: str = DEFAULT_GATEWAY_API_KEY, timeout: int = 120) -> dict[str, Any] | list[Any]: + query = urllib.parse.urlencode({k: v for k, v in (params or {}).items() if v not in (None, "")}) + url = gateway_url.rstrip("/") + path + (f"?{query}" if query else "") + req = urllib.request.Request(url, method="GET") + if api_key: + req.add_header("X-API-Key", api_key) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def patch_json(path: str, payload: dict[str, Any], params: dict[str, Any] | None = None, gateway_url: str = DEFAULT_GATEWAY_URL, api_key: str = DEFAULT_GATEWAY_API_KEY, timeout: int = 120) -> dict[str, Any]: + query = urllib.parse.urlencode({k: v for k, v in (params or {}).items() if v not in (None, "")}) + url = gateway_url.rstrip("/") + path + (f"?{query}" if query else "") + data = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = urllib.request.Request(url, data=data, method="PATCH") + req.add_header("Content-Type", "application/json") + if api_key: + req.add_header("X-API-Key", api_key) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def delete_json(path: str, params: dict[str, Any] | None = None, gateway_url: str = DEFAULT_GATEWAY_URL, api_key: str = DEFAULT_GATEWAY_API_KEY, timeout: int = 120) -> dict[str, Any]: + query = urllib.parse.urlencode({k: v for k, v in (params or {}).items() if v not in (None, "")}) + url = gateway_url.rstrip("/") + path + (f"?{query}" if query else "") + req = urllib.request.Request(url, method="DELETE") + if api_key: + req.add_header("X-API-Key", api_key) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) diff --git a/integrations/hermes/memory-gateway/scripts/evermemos_health.py b/integrations/hermes/memory-gateway/scripts/evermemos_health.py new file mode 100644 index 0000000..cee6cf9 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/evermemos_health.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, get_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Check standalone EverMemOS health through Memory Gateway.") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + print(json.dumps(get_json("/v1/evermemos/health", gateway_url=args.gateway_url, api_key=args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/integrations/hermes/memory-gateway/scripts/memory_append_episode.py b/integrations/hermes/memory-gateway/scripts/memory_append_episode.py new file mode 100644 index 0000000..e51c65b --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_append_episode.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json + + +def load_content(args: argparse.Namespace) -> str: + if args.file: + return Path(args.file).read_text(encoding="utf-8") + if args.text: + return args.text + return sys.stdin.read().strip() + + +def main() -> None: + parser = argparse.ArgumentParser(description="Append session episode memory without directly promoting it.") + parser.add_argument("--user-id", required=True) + parser.add_argument("--session-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--namespace", default="") + parser.add_argument("--text", default="") + parser.add_argument("--file", default="") + parser.add_argument("--tag", action="append", default=[]) + parser.add_argument("--source", default="conversation") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + + content = load_content(args) + if not content: + parser.error("No episode content provided via --text, --file, or stdin") + + payload = { + "user_id": args.user_id, + "agent_id": args.agent_id or None, + "workspace_id": args.workspace_id or None, + "session_id": args.session_id, + "namespace": args.namespace or None, + "content": content, + "tags": args.tag, + "source": args.source, + } + print(json.dumps(post_json("/v1/episodes", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_commit_session.py b/integrations/hermes/memory-gateway/scripts/memory_commit_session.py new file mode 100644 index 0000000..8b7fa2c --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_commit_session.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Commit a session through the minimal EverMemOS consolidation worker.") + parser.add_argument("--user-id", required=True) + parser.add_argument("--session-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--target-namespace", default="") + parser.add_argument("--min-importance", type=float, default=0.6) + parser.add_argument("--no-promote", action="store_true") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + + payload = { + "user_id": args.user_id, + "agent_id": args.agent_id or None, + "workspace_id": args.workspace_id or None, + "session_id": args.session_id, + "promote": not args.no_promote, + "min_importance": args.min_importance, + "target_namespace": args.target_namespace or None, + } + print(json.dumps(post_json(f"/v1/sessions/{args.session_id}/commit", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_create_user.py b/integrations/hermes/memory-gateway/scripts/memory_create_user.py new file mode 100644 index 0000000..f4bf9a9 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_create_user.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Create or replace a Memory Gateway v1 user.") + parser.add_argument("--user-id", required=True) + parser.add_argument("--display-name", required=True) + parser.add_argument("--preference", action="append", default=[], help="Preference as key=value; repeatable") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + + preferences = {} + for item in args.preference: + if "=" not in item: + parser.error(f"Invalid --preference {item!r}; expected key=value") + key, value = item.split("=", 1) + preferences[key.strip()] = value.strip() + + payload = { + "user_id": args.user_id, + "display_name": args.display_name, + "preferences": preferences, + } + print(json.dumps(post_json("/v1/users", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_delete.py b/integrations/hermes/memory-gateway/scripts/memory_delete.py new file mode 100644 index 0000000..da95e3c --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_delete.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, delete_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Delete a MemoryRecord if the caller has access.") + parser.add_argument("--memory-id", required=True) + parser.add_argument("--user-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + params = { + "user_id": args.user_id, + "agent_id": args.agent_id, + "workspace_id": args.workspace_id, + "session_id": args.session_id, + } + print(json.dumps(delete_json(f"/v1/memory/{args.memory_id}", params=params, gateway_url=args.gateway_url, api_key=args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_feedback.py b/integrations/hermes/memory-gateway/scripts/memory_feedback.py new file mode 100644 index 0000000..bfe73dd --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_feedback.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Attach quality feedback to a MemoryRecord.") + parser.add_argument("--memory-id", required=True) + parser.add_argument("--user-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") + parser.add_argument("--feedback", required=True, choices=["useful", "not_useful", "incorrect", "duplicate", "outdated"]) + parser.add_argument("--comment", default="") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + payload = { + "user_id": args.user_id, + "agent_id": args.agent_id or None, + "workspace_id": args.workspace_id or None, + "session_id": args.session_id or None, + "feedback": args.feedback, + "comment": args.comment or None, + } + print(json.dumps(post_json(f"/v1/memory/{args.memory_id}/feedback", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_get_profile.py b/integrations/hermes/memory-gateway/scripts/memory_get_profile.py new file mode 100644 index 0000000..2dd5915 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_get_profile.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, get_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Get a user's Memory Gateway profile.") + parser.add_argument("--user-id", required=True) + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + print(json.dumps(get_json(f"/v1/users/{args.user_id}/profile", gateway_url=args.gateway_url, api_key=args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_list_namespaces.py b/integrations/hermes/memory-gateway/scripts/memory_list_namespaces.py new file mode 100644 index 0000000..acecf65 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_list_namespaces.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, get_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="List namespaces visible to a user/agent/workspace/session context.") + parser.add_argument("--user-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + params = { + "user_id": args.user_id, + "agent_id": args.agent_id, + "workspace_id": args.workspace_id, + "session_id": args.session_id, + } + print(json.dumps(get_json("/v1/namespaces", params=params, gateway_url=args.gateway_url, api_key=args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_patch.py b/integrations/hermes/memory-gateway/scripts/memory_patch.py new file mode 100644 index 0000000..ebc26d4 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_patch.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, patch_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Patch a MemoryRecord.") + parser.add_argument("--memory-id", required=True) + parser.add_argument("--user-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") + parser.add_argument("--content", default="") + parser.add_argument("--summary", default="") + parser.add_argument("--tag", action="append", default=None) + parser.add_argument("--importance", type=float, default=None) + parser.add_argument("--confidence", type=float, default=None) + parser.add_argument("--visibility", choices=["private", "agent-only", "workspace-shared", "global"], default=None) + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + + payload = {} + if args.content: + payload["content"] = args.content + if args.summary: + payload["summary"] = args.summary + if args.tag is not None: + payload["tags"] = args.tag + if args.importance is not None: + payload["importance"] = args.importance + if args.confidence is not None: + payload["confidence"] = args.confidence + if args.visibility: + payload["visibility"] = args.visibility + if not payload: + parser.error("No patch fields provided") + + params = { + "user_id": args.user_id, + "agent_id": args.agent_id, + "workspace_id": args.workspace_id, + "session_id": args.session_id, + } + print(json.dumps(patch_json(f"/v1/memory/{args.memory_id}", payload, params=params, gateway_url=args.gateway_url, api_key=args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_search.py b/integrations/hermes/memory-gateway/scripts/memory_search.py new file mode 100644 index 0000000..188fb86 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_search.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json + + +def main() -> None: + parser = argparse.ArgumentParser(description="Search v1 Memory Gateway with user/agent/workspace/session ACL.") + parser.add_argument("--query", required=True) + parser.add_argument("--user-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") + parser.add_argument("--namespace", action="append", default=[], help="Allowed namespace to search; repeatable") + parser.add_argument("--memory-type", action="append", default=[], help="Memory type filter; repeatable") + parser.add_argument("--tag", action="append", default=[], help="Tag filter; repeatable") + parser.add_argument("--limit", type=int, default=5) + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + + payload = { + "user_id": args.user_id, + "agent_id": args.agent_id or None, + "workspace_id": args.workspace_id or None, + "session_id": args.session_id or None, + "query": args.query, + "namespaces": args.namespace, + "memory_types": args.memory_type, + "tags": args.tag, + "limit": args.limit, + } + print(json.dumps(post_json("/v1/memory/search", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/memory_upsert.py b/integrations/hermes/memory-gateway/scripts/memory_upsert.py new file mode 100644 index 0000000..e2b5443 --- /dev/null +++ b/integrations/hermes/memory-gateway/scripts/memory_upsert.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json + + +def load_content(args: argparse.Namespace) -> str: + if args.file: + return Path(args.file).read_text(encoding="utf-8") + if args.text: + return args.text + return sys.stdin.read().strip() + + +def main() -> None: + parser = argparse.ArgumentParser(description="Create a v1 MemoryRecord through Memory Gateway.") + parser.add_argument("--user-id", required=True) + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") + parser.add_argument("--namespace", default="") + parser.add_argument("--memory-type", default="fact") + parser.add_argument("--text", default="") + parser.add_argument("--file", default="") + parser.add_argument("--summary", default="") + parser.add_argument("--tag", action="append", default=[]) + parser.add_argument("--importance", type=float, default=0.5) + parser.add_argument("--confidence", type=float, default=0.8) + parser.add_argument("--visibility", choices=["private", "agent-only", "workspace-shared", "global"], default="private") + parser.add_argument("--source", default="manual") + parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) + parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) + args = parser.parse_args() + + content = load_content(args) + if not content: + parser.error("No memory content provided via --text, --file, or stdin") + + payload = { + "user_id": args.user_id, + "agent_id": args.agent_id or None, + "workspace_id": args.workspace_id or None, + "session_id": args.session_id or None, + "namespace": args.namespace or None, + "memory_type": args.memory_type, + "content": content, + "summary": args.summary or None, + "tags": args.tag, + "importance": args.importance, + "confidence": args.confidence, + "visibility": args.visibility, + "source": args.source, + } + print(json.dumps(post_json("/v1/memory", payload, args.gateway_url, args.api_key), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() + diff --git a/integrations/hermes/memory-gateway/scripts/retrieve_memory.py b/integrations/hermes/memory-gateway/scripts/retrieve_memory.py index 457aa17..14abe8f 100755 --- a/integrations/hermes/memory-gateway/scripts/retrieve_memory.py +++ b/integrations/hermes/memory-gateway/scripts/retrieve_memory.py @@ -7,21 +7,37 @@ from _client import DEFAULT_GATEWAY_API_KEY, DEFAULT_GATEWAY_URL, post_json def main() -> None: - parser = argparse.ArgumentParser(description="Retrieve memory/resources from Memory Gateway.") + parser = argparse.ArgumentParser(description="Retrieve memory/resources from Memory Gateway. Defaults to v1 ACL-aware search when --user-id is provided.") parser.add_argument("--query", required=True, help="Search query") parser.add_argument("--uri", default="", help="Optional OpenViking URI scope, e.g. viking://resources/project") parser.add_argument("--namespace", default="", help="Optional namespace if URI is not provided") + parser.add_argument("--user-id", default="", help="Use v1 ACL-aware search when provided") + parser.add_argument("--agent-id", default="") + parser.add_argument("--workspace-id", default="") + parser.add_argument("--session-id", default="") parser.add_argument("--limit", type=int, default=5) parser.add_argument("--gateway-url", default=DEFAULT_GATEWAY_URL) parser.add_argument("--api-key", default=DEFAULT_GATEWAY_API_KEY) args = parser.parse_args() - payload = {"query": args.query, "limit": args.limit} - if args.uri: - payload["uri"] = args.uri - if args.namespace: - payload["namespace"] = args.namespace - result = post_json("/api/search", payload, args.gateway_url, args.api_key) + if args.user_id: + payload = { + "user_id": args.user_id, + "agent_id": args.agent_id or None, + "workspace_id": args.workspace_id or None, + "session_id": args.session_id or None, + "query": args.query, + "namespaces": [args.namespace] if args.namespace else [], + "limit": args.limit, + } + result = post_json("/v1/memory/search", payload, args.gateway_url, args.api_key) + else: + payload = {"query": args.query, "limit": args.limit} + if args.uri: + payload["uri"] = args.uri + if args.namespace: + payload["namespace"] = args.namespace + result = post_json("/api/search", payload, args.gateway_url, args.api_key) print(json.dumps(result, ensure_ascii=False, indent=2)) diff --git a/memory_gateway/api_v1.py b/memory_gateway/api_v1.py new file mode 100644 index 0000000..89ce3a3 --- /dev/null +++ b/memory_gateway/api_v1.py @@ -0,0 +1,115 @@ +"""Generic Memory Gateway v1 HTTP API.""" +from __future__ import annotations + +from typing import Optional + +from fastapi import APIRouter, Depends, Query + +from .schemas import ( + AccessContext, + CommitSessionRequest, + CreateUserRequest, + EpisodeAppendRequest, + MemoryFeedbackRequest, + MemoryPatchRequest, + MemorySearchRequest, + MemoryUpsertRequest, +) +from .server_auth import verify_api_key_compat +from .services import service + +router = APIRouter(prefix="/v1", tags=["memory-v1"], dependencies=[Depends(verify_api_key_compat)]) + + +@router.post("/users") +async def create_user(request: CreateUserRequest): + return service.create_user(request) + + +@router.get("/users/{user_id}") +async def get_user(user_id: str): + return service.get_user(user_id) + + +@router.post("/memory/search") +async def search_memory(request: MemorySearchRequest): + return await service.search_memory_with_openviking(request) + + +@router.post("/memory") +async def upsert_memory(request: MemoryUpsertRequest): + return service.upsert_memory(request) + + +@router.get("/memory/{memory_id}") +async def get_memory( + memory_id: str, + user_id: str = Query(...), + agent_id: Optional[str] = Query(default=None), + workspace_id: Optional[str] = Query(default=None), + session_id: Optional[str] = Query(default=None), +): + return service.get_memory(memory_id, AccessContext(user_id=user_id, agent_id=agent_id, workspace_id=workspace_id, session_id=session_id)) + + +@router.patch("/memory/{memory_id}") +async def patch_memory( + memory_id: str, + patch: MemoryPatchRequest, + user_id: str = Query(...), + agent_id: Optional[str] = Query(default=None), + workspace_id: Optional[str] = Query(default=None), + session_id: Optional[str] = Query(default=None), +): + return service.patch_memory(memory_id, AccessContext(user_id=user_id, agent_id=agent_id, workspace_id=workspace_id, session_id=session_id), patch) + + +@router.delete("/memory/{memory_id}") +async def delete_memory( + memory_id: str, + user_id: str = Query(...), + agent_id: Optional[str] = Query(default=None), + workspace_id: Optional[str] = Query(default=None), + session_id: Optional[str] = Query(default=None), +): + return service.delete_memory(memory_id, AccessContext(user_id=user_id, agent_id=agent_id, workspace_id=workspace_id, session_id=session_id)) + + +@router.post("/episodes") +async def append_episode(request: EpisodeAppendRequest): + return service.append_episode(request) + + +@router.post("/sessions/{session_id}/commit") +async def commit_session(session_id: str, request: CommitSessionRequest): + return service.commit_session(session_id, request) + + +@router.get("/users/{user_id}/profile") +async def get_profile(user_id: str): + return service.get_profile(user_id) + + +@router.post("/memory/{memory_id}/feedback") +async def memory_feedback(memory_id: str, request: MemoryFeedbackRequest): + return service.add_feedback(memory_id, request) + + +@router.get("/namespaces") +async def list_namespaces( + user_id: str = Query(...), + agent_id: Optional[str] = Query(default=None), + workspace_id: Optional[str] = Query(default=None), + session_id: Optional[str] = Query(default=None), +): + return service.list_namespaces(AccessContext(user_id=user_id, agent_id=agent_id, workspace_id=workspace_id, session_id=session_id)) + + +@router.get("/audit") +async def list_audit(limit: int = Query(default=100, ge=1, le=1000)): + return service.list_audit(limit) + + +@router.get("/evermemos/health") +async def evermemos_health(): + return service.evermemos_health() diff --git a/memory_gateway/config.py b/memory_gateway/config.py index 0234307..1ca243a 100644 --- a/memory_gateway/config.py +++ b/memory_gateway/config.py @@ -6,7 +6,7 @@ from typing import Optional import yaml from pydantic import ValidationError -from .types import Config, ServerConfig, OpenVikingConfig, MemoryConfig, LoggingConfig, LLMConfig, ObsidianConfig +from .types import Config, ServerConfig, OpenVikingConfig, EverMemOSConfig, MemoryConfig, LoggingConfig, LLMConfig, ObsidianConfig, StorageConfig def load_config(config_path: Optional[str] = None) -> Config: @@ -30,10 +30,12 @@ def load_config(config_path: Optional[str] = None) -> Config: return Config( server=ServerConfig(**data.get("server", {})), openviking=OpenVikingConfig(**data.get("openviking", {})), + evermemos=EverMemOSConfig(**data.get("evermemos", {})), memory=MemoryConfig(**data.get("memory", {})), logging=LoggingConfig(**data.get("logging", {})), llm=LLMConfig(**data.get("llm", {})), obsidian=ObsidianConfig(**data.get("obsidian", {})), + storage=StorageConfig(**data.get("storage", {})), ) except (ValidationError, yaml.YAMLError) as e: print(f"配置文件解析错误: {e}") diff --git a/memory_gateway/evermemos_client.py b/memory_gateway/evermemos_client.py new file mode 100644 index 0000000..169fbe6 --- /dev/null +++ b/memory_gateway/evermemos_client.py @@ -0,0 +1,113 @@ +"""Client for the external EverMemOS consolidation service.""" +from __future__ import annotations + +from typing import Any + +import httpx + +from .config import get_config +from .schemas import AccessContext, EpisodeRecord, MemoryRecord + + +class EverMemOSError(RuntimeError): + """Raised when the external EverMemOS service cannot consolidate.""" + + +class EverMemOSClient: + """Small HTTP client with a tolerant response normalizer. + + The deployed EverMemOS API may evolve independently from Memory Gateway. + Gateway sends a stable payload and accepts several common response shapes: + `result`, `data`, or the raw top-level object with `candidates/promoted`. + """ + + def __init__( + self, + base_url: str | None = None, + api_key: str | None = None, + timeout: int | None = None, + health_path: str | None = None, + consolidate_path: str | None = None, + ) -> None: + config = get_config().evermemos + self.base_url = (base_url or config.url).rstrip("/") + self.api_key = api_key if api_key is not None else config.api_key + self.timeout = timeout or config.timeout + self.health_path = health_path or config.health_path + self.consolidate_path = consolidate_path or config.consolidate_path + + def _headers(self) -> dict[str, str]: + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["X-API-Key"] = self.api_key + headers["Authorization"] = f"Bearer {self.api_key}" + return headers + + def health(self) -> dict[str, Any]: + url = self.base_url + self.health_path + try: + with httpx.Client(timeout=self.timeout, headers=self._headers()) as client: + response = client.get(url) + response.raise_for_status() + return {"status": "ok", "url": self.base_url, "response": response.json()} + except Exception as exc: # noqa: BLE001 + return {"status": "error", "url": self.base_url, "error": str(exc)} + + def consolidate_session( + self, + session_id: str, + ctx: AccessContext, + episodes: list[EpisodeRecord], + existing_memories: list[MemoryRecord], + min_importance: float, + target_namespace: str | None, + ) -> dict[str, Any]: + payload = { + "schema_version": "memory-gateway.evermemos.consolidate.v1", + "session_id": session_id, + "context": ctx.model_dump(mode="json"), + "min_importance": min_importance, + "target_namespace": target_namespace, + "episodes": [episode.model_dump(mode="json") for episode in episodes], + "existing_memories": [memory.model_dump(mode="json") for memory in existing_memories], + } + paths = [ + self.consolidate_path, + "/v1/sessions/consolidate", + "/v1/memory/consolidate", + "/api/v1/sessions/consolidate", + "/api/consolidate", + "/consolidate", + ] + errors: list[str] = [] + for path in dict.fromkeys(paths): + try: + with httpx.Client(timeout=self.timeout, headers=self._headers()) as client: + response = client.post(self.base_url + path, json=payload) + if response.status_code == 404: + errors.append(f"{path}: 404") + continue + response.raise_for_status() + return self._normalize_response(response.json(), path) + except Exception as exc: # noqa: BLE001 + errors.append(f"{path}: {exc}") + if "Connection refused" in str(exc) or "timed out" in str(exc): + break + raise EverMemOSError("; ".join(errors) or "EverMemOS consolidation failed") + + def _normalize_response(self, payload: dict[str, Any], path: str) -> dict[str, Any]: + data = payload.get("result") or payload.get("data") or payload + return { + "backend": "external", + "service_url": self.base_url, + "endpoint": path, + "raw": payload, + "session_id": data.get("session_id"), + "episodes": data.get("episodes"), + "candidates": data.get("candidates") or data.get("candidate_memories") or [], + "promoted": data.get("promoted") or data.get("promoted_memories") or data.get("memories") or [], + "duplicates": data.get("duplicates") or [], + "conflicts": data.get("conflicts") or [], + "review_drafts": data.get("review_drafts") or [], + } + diff --git a/memory_gateway/evermemos_service.py b/memory_gateway/evermemos_service.py new file mode 100644 index 0000000..840ccad --- /dev/null +++ b/memory_gateway/evermemos_service.py @@ -0,0 +1,108 @@ +"""Standalone EverMemOS-compatible consolidation service. + +This is a lightweight local service for POC use. It intentionally exposes the +same HTTP contract that Memory Gateway calls: + +POST /v1/sessions/consolidate + +The service does not own Memory Gateway's metadata database. It receives +episodes and existing memories in the request, returns candidate/promoted +MemoryRecord payloads, and creates Obsidian review drafts for high-value or +conflicting candidates. +""" +from __future__ import annotations + +import argparse +import logging +from typing import Any + +from fastapi import FastAPI +from pydantic import BaseModel, Field + +from .config import load_config, set_config +from .repositories import InMemoryRepository +from .schemas import AccessContext, EpisodeRecord, MemoryRecord +from .workers.evermemos_worker import EverMemOSWorker + +logger = logging.getLogger(__name__) + + +class ConsolidateRequest(BaseModel): + schema_version: str = "memory-gateway.evermemos.consolidate.v1" + session_id: str + context: dict[str, Any] + min_importance: float = 0.6 + target_namespace: str | None = None + episodes: list[dict[str, Any]] = Field(default_factory=list) + existing_memories: list[dict[str, Any]] = Field(default_factory=list) + + +app = FastAPI(title="Local EverMemOS POC Service", version="0.1.0") + + +@app.get("/health") +async def health() -> dict[str, Any]: + return { + "status": "ok", + "service": "evermemos-local", + "version": "0.1.0", + "contract": "memory-gateway.evermemos.consolidate.v1", + } + + +@app.post("/v1/sessions/consolidate") +async def consolidate_session(request: ConsolidateRequest) -> dict[str, Any]: + repo = InMemoryRepository() + ctx = AccessContext.model_validate(request.context) + + for item in request.existing_memories: + try: + repo.upsert_memory(MemoryRecord.model_validate(item)) + except Exception as exc: # noqa: BLE001 + logger.warning("Skipping invalid existing memory: %s", exc) + + for item in request.episodes: + try: + repo.append_episode(EpisodeRecord.model_validate(item)) + except Exception as exc: # noqa: BLE001 + logger.warning("Skipping invalid episode: %s", exc) + + worker = EverMemOSWorker(repo) + result = worker.consolidate_session( + session_id=request.session_id, + ctx=ctx, + min_importance=request.min_importance, + target_namespace=request.target_namespace, + ) + return { + "status": "ok", + "backend": "evermemos-local", + "result": { + "session_id": result.session_id, + "episodes": result.episodes, + "candidates": [memory.model_dump(mode="json") for memory in result.candidates], + "promoted": [memory.model_dump(mode="json") for memory in result.promoted], + "duplicates": result.duplicates, + "conflicts": result.conflicts, + "review_drafts": result.review_drafts, + }, + } + + +def main() -> None: + import uvicorn + + parser = argparse.ArgumentParser(description="Run the local EverMemOS POC service.") + parser.add_argument("--config", default="config.yaml") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=1995) + args = parser.parse_args() + + config = load_config(args.config) + set_config(config) + uvicorn.run(app, host=args.host, port=args.port, log_level=config.logging.level.lower()) + + +if __name__ == "__main__": + main() + diff --git a/memory_gateway/mcp_tools_v1.py b/memory_gateway/mcp_tools_v1.py new file mode 100644 index 0000000..05e39f3 --- /dev/null +++ b/memory_gateway/mcp_tools_v1.py @@ -0,0 +1,135 @@ +"""MCP tool definitions for the generic Memory Gateway contract. + +The legacy MCP endpoint in server.py remains available. These definitions are +the target v1 tool contract for Nanobot, Hermes Agent, OpenClaw, and other +agent frameworks. +""" + +MEMORY_GATEWAY_MCP_TOOLS = [ + { + "name": "memory_search", + "description": "Search accessible memories with user/agent/workspace/session isolation.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "session_id": {"type": "string"}, + "query": {"type": "string"}, + "namespaces": {"type": "array", "items": {"type": "string"}}, + "limit": {"type": "integer", "default": 10}, + }, + "required": ["user_id", "query"], + }, + }, + { + "name": "memory_upsert", + "description": "Create or update a memory record after ACL and namespace routing.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "session_id": {"type": "string"}, + "namespace": {"type": "string"}, + "memory_type": {"type": "string"}, + "content": {"type": "string"}, + "summary": {"type": "string"}, + "tags": {"type": "array", "items": {"type": "string"}}, + "importance": {"type": "number"}, + "confidence": {"type": "number"}, + "visibility": {"type": "string"}, + }, + "required": ["user_id", "content"], + }, + }, + { + "name": "memory_append_episode", + "description": "Append temporary episode/session memory without automatically promoting it.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "session_id": {"type": "string"}, + "content": {"type": "string"}, + "events": {"type": "array", "items": {"type": "object"}}, + "tags": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["user_id", "session_id", "content"], + }, + }, + { + "name": "memory_commit_session", + "description": "Promote selected session memories into long-term memory via consolidation.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "session_id": {"type": "string"}, + "promote": {"type": "boolean", "default": True}, + "min_importance": {"type": "number", "default": 0.6}, + }, + "required": ["user_id", "session_id"], + }, + }, + { + "name": "memory_get_profile", + "description": "Get the effective user profile memory.", + "inputSchema": { + "type": "object", + "properties": {"user_id": {"type": "string"}}, + "required": ["user_id"], + }, + }, + { + "name": "memory_list_namespaces", + "description": "List namespaces visible to the current user/agent/workspace context.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "session_id": {"type": "string"}, + }, + "required": ["user_id"], + }, + }, + { + "name": "memory_delete", + "description": "Delete or archive a memory record if the caller has access.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "memory_id": {"type": "string"}, + }, + "required": ["user_id", "memory_id"], + }, + }, + { + "name": "memory_feedback", + "description": "Attach quality feedback to a memory record for pruning/merge decisions.", + "inputSchema": { + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "agent_id": {"type": "string"}, + "workspace_id": {"type": "string"}, + "memory_id": {"type": "string"}, + "feedback": {"type": "string"}, + "comment": {"type": "string"}, + }, + "required": ["user_id", "memory_id", "feedback"], + }, + }, +] + diff --git a/memory_gateway/namespace.py b/memory_gateway/namespace.py new file mode 100644 index 0000000..9226a61 --- /dev/null +++ b/memory_gateway/namespace.py @@ -0,0 +1,115 @@ +"""Namespace construction and access checks for Memory Gateway.""" +from __future__ import annotations + +from datetime import datetime, timezone + +from .schemas import AccessContext, MemoryRecord, NamespaceInfo, Visibility + + +def user_profile_namespace(user_id: str) -> str: + return f"user/{user_id}/profile" + + +def user_preferences_namespace(user_id: str) -> str: + return f"user/{user_id}/preferences" + + +def user_long_term_namespace(user_id: str) -> str: + return f"user/{user_id}/long_term" + + +def agent_memory_namespace(agent_id: str) -> str: + return f"agent/{agent_id}/memory" + + +def workspace_shared_namespace(workspace_id: str) -> str: + return f"workspace/{workspace_id}/shared" + + +def session_episodic_namespace(session_id: str) -> str: + return f"session/{session_id}/episodic" + + +def global_public_namespace() -> str: + return "global/public" + + +def default_namespace_for_context(ctx: AccessContext, visibility: Visibility) -> str: + if visibility == Visibility.AGENT_ONLY and ctx.agent_id: + return agent_memory_namespace(ctx.agent_id) + if visibility == Visibility.WORKSPACE_SHARED and ctx.workspace_id: + return workspace_shared_namespace(ctx.workspace_id) + if ctx.session_id: + return session_episodic_namespace(ctx.session_id) + return user_long_term_namespace(ctx.user_id) + + +def can_access_memory(ctx: AccessContext, memory: MemoryRecord) -> bool: + if memory.expires_at and memory.expires_at <= datetime.now(timezone.utc): + return False + if memory.visibility == Visibility.GLOBAL: + return True + if memory.visibility == Visibility.PRIVATE: + return memory.user_id == ctx.user_id + if memory.visibility == Visibility.AGENT_ONLY: + return memory.user_id == ctx.user_id and memory.agent_id == ctx.agent_id + if memory.visibility == Visibility.WORKSPACE_SHARED: + return memory.workspace_id is not None and memory.workspace_id == ctx.workspace_id + return False + + +def visible_namespaces(ctx: AccessContext) -> list[NamespaceInfo]: + namespaces = [ + NamespaceInfo( + namespace=user_profile_namespace(ctx.user_id), + owner_user_id=ctx.user_id, + visibility=Visibility.PRIVATE, + description="用户 profile 与稳定偏好", + ), + NamespaceInfo( + namespace=user_preferences_namespace(ctx.user_id), + owner_user_id=ctx.user_id, + visibility=Visibility.PRIVATE, + description="用户显式偏好", + ), + NamespaceInfo( + namespace=user_long_term_namespace(ctx.user_id), + owner_user_id=ctx.user_id, + visibility=Visibility.PRIVATE, + description="用户长期记忆", + ), + NamespaceInfo( + namespace=global_public_namespace(), + visibility=Visibility.GLOBAL, + description="全局公开知识", + ), + ] + if ctx.agent_id: + namespaces.append( + NamespaceInfo( + namespace=agent_memory_namespace(ctx.agent_id), + owner_user_id=ctx.user_id, + visibility=Visibility.AGENT_ONLY, + description="指定 agent 私有经验", + ) + ) + if ctx.workspace_id: + namespaces.append( + NamespaceInfo( + namespace=workspace_shared_namespace(ctx.workspace_id), + owner_user_id=ctx.user_id, + visibility=Visibility.WORKSPACE_SHARED, + description="workspace / project 共享记忆", + ) + ) + if ctx.session_id: + namespaces.append( + NamespaceInfo( + namespace=session_episodic_namespace(ctx.session_id), + owner_user_id=ctx.user_id, + visibility=Visibility.PRIVATE, + description="session 临时 episodic memory", + ) + ) + return namespaces + diff --git a/memory_gateway/obsidian_review.py b/memory_gateway/obsidian_review.py new file mode 100644 index 0000000..4764615 --- /dev/null +++ b/memory_gateway/obsidian_review.py @@ -0,0 +1,77 @@ +"""Obsidian review draft writer.""" +from __future__ import annotations + +import re +from datetime import datetime, timezone +from pathlib import Path + +from .config import get_config +from .schemas import MemoryRecord + + +def _slugify(value: str, fallback: str) -> str: + slug = re.sub(r"[^a-zA-Z0-9\u4e00-\u9fff_-]+", "-", value.lower()).strip("-") + slug = re.sub(r"-+", "-", slug)[:80].strip("-") + return slug or fallback + + +def write_review_draft(memory: MemoryRecord, reason: str, conflict_ids: list[str] | None = None) -> Path: + config = get_config() + review_dir = getattr(config.obsidian, "review_dir", "Reviews/Queue") + vault_path = Path(config.obsidian.vault_path) + target_dir = vault_path / review_dir + target_dir.mkdir(parents=True, exist_ok=True) + + title = memory.summary or memory.content[:80] or memory.id + filename = f"{_slugify(title, memory.id)}-{memory.id}.md" + path = target_dir / filename + conflict_ids = conflict_ids or [] + + content = "\n".join( + [ + "---", + "type: memory_review", + f"memory_id: {memory.id}", + f"user_id: {memory.user_id}", + f"agent_id: {memory.agent_id or ''}", + f"workspace_id: {memory.workspace_id or ''}", + f"namespace: {memory.namespace}", + f"visibility: {memory.visibility.value}", + f"importance: {memory.importance}", + f"confidence: {memory.confidence}", + f"reason: {reason}", + f"created_at: {datetime.now(timezone.utc).isoformat()}", + "tags:", + " - memory/review", + " - source/evermemos", + "---", + "", + f"# Memory Review - {title}", + "", + "## Candidate", + "", + memory.content, + "", + "## Summary", + "", + memory.summary or "", + "", + "## Proposed Action", + "", + "- [ ] Accept", + "- [ ] Edit", + "- [ ] Reject", + "- [ ] Merge", + "- [ ] Archive", + "", + "## Conflict IDs", + "", + "\n".join(f"- {memory_id}" for memory_id in conflict_ids) if conflict_ids else "- none", + "", + "## Notes", + "", + ] + ) + path.write_text(content, encoding="utf-8") + return path + diff --git a/memory_gateway/repositories.py b/memory_gateway/repositories.py new file mode 100644 index 0000000..ac9c71b --- /dev/null +++ b/memory_gateway/repositories.py @@ -0,0 +1,328 @@ +"""Metadata repositories for Memory Gateway. + +SQLite is the default POC store. The in-memory implementation is retained for +small isolated tests and for cases where persistence is explicitly disabled. +""" +from __future__ import annotations + +import json +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable, Optional, Protocol + +from .config import get_config +from .schemas import AuditLog, EpisodeRecord, MemoryRecord, ProfileRecord, UserRecord + + +class MetadataRepository(Protocol): + def create_user(self, user: UserRecord) -> UserRecord: ... + def get_user(self, user_id: str) -> Optional[UserRecord]: ... + def upsert_memory(self, memory: MemoryRecord) -> MemoryRecord: ... + def get_memory(self, memory_id: str) -> Optional[MemoryRecord]: ... + def delete_memory(self, memory_id: str) -> bool: ... + def list_memories(self) -> Iterable[MemoryRecord]: ... + def append_episode(self, episode: EpisodeRecord) -> EpisodeRecord: ... + def list_session_episodes(self, session_id: str) -> list[EpisodeRecord]: ... + def get_profile(self, user_id: str) -> Optional[ProfileRecord]: ... + def upsert_profile(self, profile: ProfileRecord) -> ProfileRecord: ... + def add_audit(self, audit: AuditLog) -> AuditLog: ... + def list_audit(self, limit: int = 100) -> list[AuditLog]: ... + + +def _json_dump_model(model) -> str: + return json.dumps(model.model_dump(mode="json"), ensure_ascii=False) + + +def _json_load_model(model_cls, payload: str): + return model_cls.model_validate(json.loads(payload)) + + +class InMemoryRepository: + def __init__(self) -> None: + self.users: dict[str, UserRecord] = {} + self.memories: dict[str, MemoryRecord] = {} + self.episodes: dict[str, EpisodeRecord] = {} + self.profiles: dict[str, ProfileRecord] = {} + self.audit_logs: list[AuditLog] = [] + + def create_user(self, user: UserRecord) -> UserRecord: + now = datetime.now(timezone.utc) + user.created_at = now + user.updated_at = now + self.users[user.id] = user + self.profiles.setdefault( + user.id, + ProfileRecord(user_id=user.id, namespace=user.profile_namespace or f"user/{user.id}/profile"), + ) + return user + + def get_user(self, user_id: str) -> Optional[UserRecord]: + return self.users.get(user_id) + + def upsert_memory(self, memory: MemoryRecord) -> MemoryRecord: + now = datetime.now(timezone.utc) + existing = self.memories.get(memory.id) + if existing: + memory.version = existing.version + 1 + memory.created_at = existing.created_at + memory.updated_at = now + self.memories[memory.id] = memory + return memory + + def get_memory(self, memory_id: str) -> Optional[MemoryRecord]: + return self.memories.get(memory_id) + + def delete_memory(self, memory_id: str) -> bool: + return self.memories.pop(memory_id, None) is not None + + def list_memories(self) -> Iterable[MemoryRecord]: + return list(self.memories.values()) + + def append_episode(self, episode: EpisodeRecord) -> EpisodeRecord: + self.episodes[episode.id] = episode + return episode + + def list_session_episodes(self, session_id: str) -> list[EpisodeRecord]: + return [episode for episode in self.episodes.values() if episode.session_id == session_id] + + def get_profile(self, user_id: str) -> Optional[ProfileRecord]: + return self.profiles.get(user_id) + + def upsert_profile(self, profile: ProfileRecord) -> ProfileRecord: + profile.updated_at = datetime.now(timezone.utc) + profile.version += 1 + self.profiles[profile.user_id] = profile + return profile + + def add_audit(self, audit: AuditLog) -> AuditLog: + self.audit_logs.append(audit) + return audit + + def list_audit(self, limit: int = 100) -> list[AuditLog]: + return self.audit_logs[-limit:] + + +class SQLiteRepository: + def __init__(self, db_path: str | Path) -> None: + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init_schema() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _init_schema(self) -> None: + with self._connect() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + payload TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS profiles ( + user_id TEXT PRIMARY KEY, + payload TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS memories ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + agent_id TEXT, + workspace_id TEXT, + session_id TEXT, + namespace TEXT NOT NULL, + memory_type TEXT NOT NULL, + visibility TEXT NOT NULL, + importance REAL NOT NULL, + confidence REAL NOT NULL, + expires_at TEXT, + archived_at TEXT, + payload TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_memories_user ON memories(user_id); + CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace); + CREATE INDEX IF NOT EXISTS idx_memories_workspace ON memories(workspace_id); + CREATE TABLE IF NOT EXISTS episodes ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + agent_id TEXT, + workspace_id TEXT, + session_id TEXT NOT NULL, + namespace TEXT NOT NULL, + payload TEXT NOT NULL, + created_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_episodes_session ON episodes(session_id); + CREATE TABLE IF NOT EXISTS audit_logs ( + id TEXT PRIMARY KEY, + actor_user_id TEXT, + actor_agent_id TEXT, + action TEXT NOT NULL, + target_type TEXT NOT NULL, + target_id TEXT, + namespace TEXT, + decision TEXT NOT NULL, + payload TEXT NOT NULL, + created_at TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_audit_created ON audit_logs(created_at); + """ + ) + + def create_user(self, user: UserRecord) -> UserRecord: + now = datetime.now(timezone.utc) + user.created_at = user.created_at or now + user.updated_at = now + with self._connect() as conn: + conn.execute( + "INSERT OR REPLACE INTO users(id, payload, updated_at) VALUES (?, ?, ?)", + (user.id, _json_dump_model(user), user.updated_at.isoformat()), + ) + self.upsert_profile(ProfileRecord(user_id=user.id, namespace=user.profile_namespace or f"user/{user.id}/profile")) + return user + + def get_user(self, user_id: str) -> Optional[UserRecord]: + with self._connect() as conn: + row = conn.execute("SELECT payload FROM users WHERE id = ?", (user_id,)).fetchone() + return _json_load_model(UserRecord, row["payload"]) if row else None + + def upsert_memory(self, memory: MemoryRecord) -> MemoryRecord: + existing = self.get_memory(memory.id) + now = datetime.now(timezone.utc) + if existing: + memory.version = existing.version + 1 + memory.created_at = existing.created_at + memory.updated_at = now + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO memories( + id, user_id, agent_id, workspace_id, session_id, namespace, + memory_type, visibility, importance, confidence, expires_at, + archived_at, payload, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + memory.id, + memory.user_id, + memory.agent_id, + memory.workspace_id, + memory.session_id, + memory.namespace, + memory.memory_type.value, + memory.visibility.value, + memory.importance, + memory.confidence, + memory.expires_at.isoformat() if memory.expires_at else None, + memory.archived_at.isoformat() if memory.archived_at else None, + _json_dump_model(memory), + memory.updated_at.isoformat(), + ), + ) + return memory + + def get_memory(self, memory_id: str) -> Optional[MemoryRecord]: + with self._connect() as conn: + row = conn.execute("SELECT payload FROM memories WHERE id = ?", (memory_id,)).fetchone() + return _json_load_model(MemoryRecord, row["payload"]) if row else None + + def delete_memory(self, memory_id: str) -> bool: + with self._connect() as conn: + cursor = conn.execute("DELETE FROM memories WHERE id = ?", (memory_id,)) + return cursor.rowcount > 0 + + def list_memories(self) -> Iterable[MemoryRecord]: + with self._connect() as conn: + rows = conn.execute("SELECT payload FROM memories").fetchall() + return [_json_load_model(MemoryRecord, row["payload"]) for row in rows] + + def append_episode(self, episode: EpisodeRecord) -> EpisodeRecord: + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO episodes( + id, user_id, agent_id, workspace_id, session_id, namespace, payload, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + episode.id, + episode.user_id, + episode.agent_id, + episode.workspace_id, + episode.session_id, + episode.namespace, + _json_dump_model(episode), + episode.created_at.isoformat(), + ), + ) + return episode + + def list_session_episodes(self, session_id: str) -> list[EpisodeRecord]: + with self._connect() as conn: + rows = conn.execute( + "SELECT payload FROM episodes WHERE session_id = ? ORDER BY created_at ASC", + (session_id,), + ).fetchall() + return [_json_load_model(EpisodeRecord, row["payload"]) for row in rows] + + def get_profile(self, user_id: str) -> Optional[ProfileRecord]: + with self._connect() as conn: + row = conn.execute("SELECT payload FROM profiles WHERE user_id = ?", (user_id,)).fetchone() + return _json_load_model(ProfileRecord, row["payload"]) if row else None + + def upsert_profile(self, profile: ProfileRecord) -> ProfileRecord: + profile.updated_at = datetime.now(timezone.utc) + with self._connect() as conn: + conn.execute( + "INSERT OR REPLACE INTO profiles(user_id, payload, updated_at) VALUES (?, ?, ?)", + (profile.user_id, _json_dump_model(profile), profile.updated_at.isoformat()), + ) + return profile + + def add_audit(self, audit: AuditLog) -> AuditLog: + with self._connect() as conn: + conn.execute( + """ + INSERT OR REPLACE INTO audit_logs( + id, actor_user_id, actor_agent_id, action, target_type, target_id, + namespace, decision, payload, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + audit.id, + audit.actor_user_id, + audit.actor_agent_id, + audit.action, + audit.target_type, + audit.target_id, + audit.namespace, + audit.decision, + _json_dump_model(audit), + audit.created_at.isoformat(), + ), + ) + return audit + + def list_audit(self, limit: int = 100) -> list[AuditLog]: + with self._connect() as conn: + rows = conn.execute( + "SELECT payload FROM audit_logs ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [_json_load_model(AuditLog, row["payload"]) for row in rows] + + +def build_repository() -> MetadataRepository: + config = get_config() + if config.storage.backend == "memory": + return InMemoryRepository() + return SQLiteRepository(config.storage.sqlite_path) + + +repository = build_repository() + diff --git a/memory_gateway/schemas.py b/memory_gateway/schemas.py new file mode 100644 index 0000000..6c81fce --- /dev/null +++ b/memory_gateway/schemas.py @@ -0,0 +1,227 @@ +"""Core schemas for the generic Memory Gateway v1 API.""" +from __future__ import annotations + +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Literal, Optional +from uuid import uuid4 + +from pydantic import BaseModel, Field + + +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +class Visibility(str, Enum): + PRIVATE = "private" + AGENT_ONLY = "agent-only" + WORKSPACE_SHARED = "workspace-shared" + GLOBAL = "global" + + +class MemoryType(str, Enum): + PROFILE = "profile" + PREFERENCE = "preference" + FACT = "fact" + DECISION = "decision" + SUMMARY = "summary" + EPISODIC = "episodic" + PROCEDURE = "procedure" + EXPERIENCE = "experience" + KNOWLEDGE = "knowledge" + + +class SourceType(str, Enum): + CONVERSATION = "conversation" + TASK = "task" + AGENT = "agent" + OBSIDIAN = "obsidian" + OPENVIKING = "openviking" + EVERMEMOS = "evermemos" + MANUAL = "manual" + + +class UserRecord(BaseModel): + id: str = Field(default_factory=lambda: f"user_{uuid4().hex[:12]}") + display_name: str + status: Literal["active", "disabled"] = "active" + profile_namespace: Optional[str] = None + preferences: dict[str, Any] = Field(default_factory=dict) + created_at: datetime = Field(default_factory=utc_now) + updated_at: datetime = Field(default_factory=utc_now) + + +class AgentRecord(BaseModel): + id: str + name: str + framework: str + owner_user_id: Optional[str] = None + created_at: datetime = Field(default_factory=utc_now) + + +class WorkspaceRecord(BaseModel): + id: str + name: str + owner_user_id: str + member_user_ids: list[str] = Field(default_factory=list) + allowed_agent_ids: list[str] = Field(default_factory=list) + created_at: datetime = Field(default_factory=utc_now) + + +class SessionRecord(BaseModel): + id: str = Field(default_factory=lambda: f"sess_{uuid4().hex[:12]}") + user_id: str + agent_id: Optional[str] = None + workspace_id: Optional[str] = None + status: Literal["open", "committed", "expired"] = "open" + expires_at: Optional[datetime] = None + created_at: datetime = Field(default_factory=utc_now) + updated_at: datetime = Field(default_factory=utc_now) + + +class ACLRule(BaseModel): + visibility: Visibility = Visibility.PRIVATE + allowed_user_ids: list[str] = Field(default_factory=list) + allowed_agent_ids: list[str] = Field(default_factory=list) + allowed_workspace_ids: list[str] = Field(default_factory=list) + + +class MemoryRecord(BaseModel): + id: str = Field(default_factory=lambda: f"mem_{uuid4().hex[:16]}") + user_id: str + agent_id: Optional[str] = None + workspace_id: Optional[str] = None + session_id: Optional[str] = None + namespace: str + memory_type: MemoryType = MemoryType.FACT + content: str + summary: Optional[str] = None + tags: list[str] = Field(default_factory=list) + importance: float = Field(default=0.5, ge=0, le=1) + confidence: float = Field(default=0.8, ge=0, le=1) + visibility: Visibility = Visibility.PRIVATE + acl: ACLRule = Field(default_factory=ACLRule) + source: SourceType = SourceType.MANUAL + source_ref: Optional[str] = None + embedding_ref: Optional[str] = None + created_at: datetime = Field(default_factory=utc_now) + updated_at: datetime = Field(default_factory=utc_now) + expires_at: Optional[datetime] = None + archived_at: Optional[datetime] = None + version: int = 1 + + +class EpisodeRecord(BaseModel): + id: str = Field(default_factory=lambda: f"epi_{uuid4().hex[:16]}") + user_id: str + agent_id: Optional[str] = None + workspace_id: Optional[str] = None + session_id: str + namespace: str + content: str + summary: Optional[str] = None + events: list[dict[str, Any]] = Field(default_factory=list) + tags: list[str] = Field(default_factory=list) + source: SourceType = SourceType.CONVERSATION + created_at: datetime = Field(default_factory=utc_now) + expires_at: Optional[datetime] = None + + +class ProfileRecord(BaseModel): + id: str = Field(default_factory=lambda: f"profile_{uuid4().hex[:12]}") + user_id: str + namespace: str + display_name: Optional[str] = None + stable_facts: list[str] = Field(default_factory=list) + preferences: dict[str, Any] = Field(default_factory=dict) + working_style: list[str] = Field(default_factory=list) + updated_from_memory_ids: list[str] = Field(default_factory=list) + version: int = 1 + updated_at: datetime = Field(default_factory=utc_now) + + +class AuditLog(BaseModel): + id: str = Field(default_factory=lambda: f"audit_{uuid4().hex[:16]}") + actor_user_id: Optional[str] = None + actor_agent_id: Optional[str] = None + action: str + target_type: str + target_id: Optional[str] = None + namespace: Optional[str] = None + decision: Literal["allow", "deny"] = "allow" + reason: Optional[str] = None + metadata: dict[str, Any] = Field(default_factory=dict) + created_at: datetime = Field(default_factory=utc_now) + + +class AccessContext(BaseModel): + user_id: str + agent_id: Optional[str] = None + workspace_id: Optional[str] = None + session_id: Optional[str] = None + + +class CreateUserRequest(BaseModel): + display_name: str + user_id: Optional[str] = None + preferences: dict[str, Any] = Field(default_factory=dict) + + +class MemorySearchRequest(AccessContext): + query: str + namespaces: list[str] = Field(default_factory=list) + memory_types: list[MemoryType] = Field(default_factory=list) + tags: list[str] = Field(default_factory=list) + limit: int = Field(default=10, ge=1, le=100) + + +class MemoryUpsertRequest(AccessContext): + namespace: Optional[str] = None + memory_type: MemoryType = MemoryType.FACT + content: str + summary: Optional[str] = None + tags: list[str] = Field(default_factory=list) + importance: float = Field(default=0.5, ge=0, le=1) + confidence: float = Field(default=0.8, ge=0, le=1) + visibility: Visibility = Visibility.PRIVATE + source: SourceType = SourceType.MANUAL + expires_at: Optional[datetime] = None + + +class MemoryPatchRequest(BaseModel): + content: Optional[str] = None + summary: Optional[str] = None + tags: Optional[list[str]] = None + importance: Optional[float] = Field(default=None, ge=0, le=1) + confidence: Optional[float] = Field(default=None, ge=0, le=1) + visibility: Optional[Visibility] = None + expires_at: Optional[datetime] = None + + +class EpisodeAppendRequest(AccessContext): + content: str + namespace: Optional[str] = None + events: list[dict[str, Any]] = Field(default_factory=list) + tags: list[str] = Field(default_factory=list) + source: SourceType = SourceType.CONVERSATION + expires_at: Optional[datetime] = None + + +class CommitSessionRequest(AccessContext): + promote: bool = True + min_importance: float = Field(default=0.6, ge=0, le=1) + target_namespace: Optional[str] = None + + +class MemoryFeedbackRequest(AccessContext): + feedback: Literal["useful", "not_useful", "incorrect", "duplicate", "outdated"] + comment: Optional[str] = None + + +class NamespaceInfo(BaseModel): + namespace: str + owner_user_id: Optional[str] = None + visibility: Visibility + description: str + diff --git a/memory_gateway/server.py b/memory_gateway/server.py index dac40b5..abe793a 100644 --- a/memory_gateway/server.py +++ b/memory_gateway/server.py @@ -24,6 +24,16 @@ from .config import get_config, set_config, Config from .openviking_client import get_openviking_client, close_openviking_client from .document_ingest import convert_file_to_markdown, save_markdown_to_obsidian, slugify from .llm import LLMConfigurationError, LLMSummaryError, summarize_with_llm +from .mcp_tools_v1 import MEMORY_GATEWAY_MCP_TOOLS +from .schemas import ( + AccessContext, + CommitSessionRequest, + EpisodeAppendRequest, + MemoryFeedbackRequest, + MemorySearchRequest, + MemoryUpsertRequest, +) +from .services import service as v1_service from .types import SearchRequest, AddMemoryRequest, AddResourceRequest, CommitSummaryRequest # 配置日志 @@ -41,7 +51,7 @@ mcp_server = Server("memory-gateway") @mcp_server.list_tools() async def list_tools() -> list[Tool]: """列出可用的 MCP 工具""" - return [ + legacy_tools = [ Tool( name="search", description="语义搜索记忆和资源", @@ -135,12 +145,25 @@ async def list_tools() -> list[Tool]: }, ), ] + v1_tools = [ + Tool( + name=definition["name"], + description=definition["description"], + inputSchema=definition["inputSchema"], + ) + for definition in MEMORY_GATEWAY_MCP_TOOLS + ] + return legacy_tools + v1_tools @mcp_server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """调用 MCP 工具""" try: + if name.startswith("memory_"): + result = await call_v1_memory_tool(name, arguments or {}) + return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False, default=str))] + ov_client = await get_openviking_client() if name == "search": @@ -200,6 +223,60 @@ async def call_tool(name: str, arguments: Any) -> list[TextContent]: return [TextContent(type="text", text=f"Error: {str(e)}")] +async def call_v1_memory_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]: + """Dispatch v1 Memory Gateway MCP tools to the same service used by /v1.""" + if name == "memory_search": + return _jsonable(await v1_service.search_memory_with_openviking(MemorySearchRequest(**arguments))) + if name == "memory_upsert": + return v1_service.upsert_memory(MemoryUpsertRequest(**arguments)).model_dump(mode="json") + if name == "memory_append_episode": + return v1_service.append_episode(EpisodeAppendRequest(**arguments)).model_dump(mode="json") + if name == "memory_commit_session": + session_id = arguments.get("session_id") + if not session_id: + raise ValueError("session_id is required") + return _jsonable(v1_service.commit_session(session_id, CommitSessionRequest(**arguments))) + if name == "memory_get_profile": + return v1_service.get_profile(arguments["user_id"]).model_dump(mode="json") + if name == "memory_list_namespaces": + return { + "namespaces": [ + item.model_dump(mode="json") + for item in v1_service.list_namespaces( + AccessContext( + user_id=arguments["user_id"], + agent_id=arguments.get("agent_id"), + workspace_id=arguments.get("workspace_id"), + session_id=arguments.get("session_id"), + ) + ) + ] + } + if name == "memory_delete": + return v1_service.delete_memory( + arguments["memory_id"], + AccessContext( + user_id=arguments["user_id"], + agent_id=arguments.get("agent_id"), + workspace_id=arguments.get("workspace_id"), + session_id=arguments.get("session_id"), + ), + ) + if name == "memory_feedback": + return v1_service.add_feedback(arguments["memory_id"], MemoryFeedbackRequest(**arguments)) + raise ValueError(f"Unknown v1 memory tool: {name}") + + +def _jsonable(value: Any) -> Any: + if hasattr(value, "model_dump"): + return value.model_dump(mode="json") + if isinstance(value, list): + return [_jsonable(item) for item in value] + if isinstance(value, dict): + return {key: _jsonable(item) for key, item in value.items()} + return value + + @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" @@ -401,10 +478,12 @@ async def health_check(): try: ov_client = await get_openviking_client() ov_status = await ov_client.health_check() + evermemos_status = v1_service.evermemos_health() return { "status": "ok", "gateway": "memory-gateway", "openviking": ov_status, + "evermemos": evermemos_status, } except Exception as e: return { @@ -490,6 +569,12 @@ mcp_router.add_api_route("/rpc", mcp_rpc, methods=["POST"]) # 注册 MCP 路由 app.include_router(mcp_router, prefix="/mcp", tags=["mcp"]) +# Generic Memory Gateway v1 routes are imported lazily here to avoid changing +# the existing legacy /api and /mcp startup path. +from .api_v1 import router as api_v1_router # noqa: E402 + +app.include_router(api_v1_router) + @app.post("/api/search", dependencies=[Depends(verify_api_key)]) async def api_search(request: SearchRequest): diff --git a/memory_gateway/server_auth.py b/memory_gateway/server_auth.py new file mode 100644 index 0000000..3970a51 --- /dev/null +++ b/memory_gateway/server_auth.py @@ -0,0 +1,15 @@ +"""Small auth bridge used by the modular v1 router.""" +from __future__ import annotations + +from typing import Optional + +from fastapi import Header, HTTPException, status + +from .config import get_config + + +def verify_api_key_compat(x_api_key: Optional[str] = Header(default=None)) -> None: + expected_key = get_config().server.api_key + if expected_key and x_api_key != expected_key: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing API key") + diff --git a/memory_gateway/services.py b/memory_gateway/services.py new file mode 100644 index 0000000..07dee39 --- /dev/null +++ b/memory_gateway/services.py @@ -0,0 +1,369 @@ +"""Application services for the generic Memory Gateway v1 API.""" +from __future__ import annotations + +from datetime import datetime, timezone + +from fastapi import HTTPException, status + +from .config import get_config +from .evermemos_client import EverMemOSError, EverMemOSClient +from .namespace import can_access_memory, default_namespace_for_context, user_long_term_namespace, visible_namespaces +from .openviking_client import get_openviking_client +from .repositories import MetadataRepository, repository +from .schemas import ( + AccessContext, + AuditLog, + CommitSessionRequest, + CreateUserRequest, + EpisodeAppendRequest, + EpisodeRecord, + MemoryFeedbackRequest, + MemoryPatchRequest, + MemoryRecord, + MemorySearchRequest, + MemoryType, + MemoryUpsertRequest, + NamespaceInfo, + ProfileRecord, + SourceType, + UserRecord, + Visibility, +) +from .workers.evermemos_worker import EverMemOSWorker + + +class MemoryGatewayService: + def __init__(self, repo: MetadataRepository = repository, evermemos_client: EverMemOSClient | None = None) -> None: + self.repo = repo + self.evermemos_client = evermemos_client + + def create_user(self, request: CreateUserRequest) -> UserRecord: + user = UserRecord( + id=request.user_id or UserRecord(display_name=request.display_name).id, + display_name=request.display_name, + preferences=request.preferences, + ) + user.profile_namespace = f"user/{user.id}/profile" + self.repo.create_user(user) + self._audit("create_user", "user", user.id, namespace=user.profile_namespace, actor_user_id=user.id) + return user + + def get_user(self, user_id: str) -> UserRecord: + user = self.repo.get_user(user_id) + if not user: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") + return user + + def search_memory(self, request: MemorySearchRequest) -> dict: + ctx = AccessContext(**request.model_dump(include={"user_id", "agent_id", "workspace_id", "session_id"})) + query = request.query.lower().strip() + results = [] + for memory in self.repo.list_memories(): + if not can_access_memory(ctx, memory): + continue + if request.namespaces and memory.namespace not in request.namespaces: + continue + if request.memory_types and memory.memory_type not in request.memory_types: + continue + if request.tags and not set(request.tags).intersection(memory.tags): + continue + haystack = " ".join([memory.content, memory.summary or "", " ".join(memory.tags)]).lower() + if query and query not in haystack: + continue + score = self._score(memory, query) + results.append({"memory": memory, "score": score}) + results.sort(key=lambda item: item["score"], reverse=True) + return {"results": results[: request.limit], "total": len(results)} + + async def search_memory_with_openviking(self, request: MemorySearchRequest) -> dict: + """Search local metadata first, then fan out to OpenViking for visible namespaces.""" + ctx = AccessContext(**request.model_dump(include={"user_id", "agent_id", "workspace_id", "session_id"})) + local = self.search_memory(request) + visible = {namespace.namespace for namespace in visible_namespaces(ctx)} + requested = set(request.namespaces) if request.namespaces else visible + allowed_namespaces = sorted(requested.intersection(visible)) + + openviking_results = [] + if allowed_namespaces and request.query.strip(): + try: + ov_client = await get_openviking_client() + per_namespace_limit = max(1, min(request.limit, 10)) + for namespace in allowed_namespaces: + result = await ov_client.search( + query=request.query, + namespace=namespace, + limit=per_namespace_limit, + ) + for item in result.results: + item = dict(item) + item["namespace"] = namespace + item["source"] = "openviking" + openviking_results.append(item) + except Exception as exc: # noqa: BLE001 + self._audit( + "openviking_search_failed", + "search", + None, + actor_user_id=request.user_id, + actor_agent_id=request.agent_id, + metadata={"error": str(exc)}, + ) + + self._audit( + "memory_search", + "memory", + None, + actor_user_id=request.user_id, + actor_agent_id=request.agent_id, + metadata={"query": request.query, "namespaces": allowed_namespaces, "openviking_results": len(openviking_results)}, + ) + return { + "results": local["results"] + [{"openviking": item, "score": item.get("score", 0)} for item in openviking_results], + "total": local["total"] + len(openviking_results), + "local_total": local["total"], + "openviking_total": len(openviking_results), + "searched_namespaces": allowed_namespaces, + } + + def upsert_memory(self, request: MemoryUpsertRequest) -> MemoryRecord: + ctx = AccessContext(**request.model_dump(include={"user_id", "agent_id", "workspace_id", "session_id"})) + namespace = request.namespace or default_namespace_for_context(ctx, request.visibility) + memory = MemoryRecord( + user_id=request.user_id, + agent_id=request.agent_id, + workspace_id=request.workspace_id, + session_id=request.session_id, + namespace=namespace, + memory_type=request.memory_type, + content=request.content, + summary=request.summary, + tags=request.tags, + importance=request.importance, + confidence=request.confidence, + visibility=request.visibility, + source=request.source, + expires_at=request.expires_at, + ) + self.repo.upsert_memory(memory) + self._audit("upsert_memory", "memory", memory.id, namespace=memory.namespace, actor_user_id=request.user_id, actor_agent_id=request.agent_id) + return memory + + def get_memory(self, memory_id: str, ctx: AccessContext) -> MemoryRecord: + memory = self.repo.get_memory(memory_id) + if not memory: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Memory not found") + if not can_access_memory(ctx, memory): + self._audit("get_memory", "memory", memory_id, namespace=memory.namespace, actor_user_id=ctx.user_id, actor_agent_id=ctx.agent_id, decision="deny") + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Memory access denied") + return memory + + def patch_memory(self, memory_id: str, ctx: AccessContext, patch: MemoryPatchRequest) -> MemoryRecord: + memory = self.get_memory(memory_id, ctx) + updates = patch.model_dump(exclude_unset=True) + for key, value in updates.items(): + setattr(memory, key, value) + memory.updated_at = datetime.now(timezone.utc) + memory.version += 1 + self.repo.upsert_memory(memory) + self._audit("patch_memory", "memory", memory.id, namespace=memory.namespace, actor_user_id=ctx.user_id, actor_agent_id=ctx.agent_id) + return memory + + def delete_memory(self, memory_id: str, ctx: AccessContext) -> dict: + memory = self.get_memory(memory_id, ctx) + deleted = self.repo.delete_memory(memory_id) + self._audit("delete_memory", "memory", memory_id, namespace=memory.namespace, actor_user_id=ctx.user_id, actor_agent_id=ctx.agent_id) + return {"deleted": deleted, "id": memory_id} + + def append_episode(self, request: EpisodeAppendRequest) -> EpisodeRecord: + ctx = AccessContext(**request.model_dump(include={"user_id", "agent_id", "workspace_id", "session_id"})) + episode = EpisodeRecord( + user_id=request.user_id, + agent_id=request.agent_id, + workspace_id=request.workspace_id, + session_id=request.session_id or "default", + namespace=request.namespace or default_namespace_for_context(ctx, Visibility.PRIVATE), + content=request.content, + events=request.events, + tags=request.tags, + source=request.source, + expires_at=request.expires_at, + ) + self.repo.append_episode(episode) + self._audit("append_episode", "episode", episode.id, namespace=episode.namespace, actor_user_id=request.user_id, actor_agent_id=request.agent_id) + return episode + + def commit_session(self, session_id: str, request: CommitSessionRequest) -> dict: + episodes = self.repo.list_session_episodes(session_id) + backend = "disabled" + error: str | None = None + if request.promote: + ctx = AccessContext( + user_id=request.user_id, + agent_id=request.agent_id, + workspace_id=request.workspace_id, + session_id=session_id, + ) + target_namespace = request.target_namespace or user_long_term_namespace(request.user_id) + config = get_config().evermemos + if config.enabled: + try: + external_result = (self.evermemos_client or EverMemOSClient()).consolidate_session( + session_id=session_id, + ctx=ctx, + episodes=episodes, + existing_memories=list(self.repo.list_memories()), + min_importance=request.min_importance, + target_namespace=target_namespace, + ) + result = self._persist_external_consolidation(external_result, ctx, session_id) + backend = "external" + except EverMemOSError as exc: + error = str(exc) + if not config.fallback_to_local: + self._audit( + "evermemos_commit_failed", + "session", + session_id, + actor_user_id=request.user_id, + actor_agent_id=request.agent_id, + decision="deny", + metadata={"error": error}, + ) + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=f"EverMemOS failed: {error}") from exc + result = self._commit_session_locally(session_id, ctx, request) + backend = "local-fallback" + else: + result = self._commit_session_locally(session_id, ctx, request) + backend = "local-disabled" + else: + result = None + self._audit("commit_session", "session", session_id, actor_user_id=request.user_id, actor_agent_id=request.agent_id) + if not result: + return {"session_id": session_id, "episodes": len(episodes), "promoted": [], "evermemos_backend": backend} + return { + "evermemos_backend": backend, + "evermemos_error": error, + "session_id": session_id, + "episodes": result.episodes, + "candidates": result.candidates, + "promoted": result.promoted, + "duplicates": result.duplicates, + "conflicts": result.conflicts, + "review_drafts": result.review_drafts, + } + + def evermemos_health(self) -> dict: + config = get_config().evermemos + if not config.enabled: + return {"status": "disabled", "url": config.url} + return (self.evermemos_client or EverMemOSClient()).health() + + def _commit_session_locally(self, session_id: str, ctx: AccessContext, request: CommitSessionRequest): + worker = EverMemOSWorker(self.repo) + return worker.consolidate_session( + session_id=session_id, + ctx=ctx, + min_importance=request.min_importance, + target_namespace=request.target_namespace or user_long_term_namespace(request.user_id), + ) + + def _persist_external_consolidation(self, external_result: dict, ctx: AccessContext, session_id: str): + from .workers.evermemos_worker import ConsolidationResult + + result = ConsolidationResult( + session_id=session_id, + episodes=external_result.get("episodes") or len(self.repo.list_session_episodes(session_id)), + duplicates=external_result.get("duplicates", []), + conflicts=external_result.get("conflicts", []), + review_drafts=external_result.get("review_drafts", []), + ) + for item in external_result.get("candidates", []): + memory = self._memory_from_external(item, ctx, session_id) + if memory: + result.candidates.append(memory) + for item in external_result.get("promoted", []): + memory = self._memory_from_external(item, ctx, session_id) + if memory: + self.repo.upsert_memory(memory) + result.promoted.append(memory) + if all(candidate.id != memory.id for candidate in result.candidates): + result.candidates.append(memory) + return result + + def _memory_from_external(self, item: dict, ctx: AccessContext, session_id: str) -> MemoryRecord | None: + if not isinstance(item, dict): + return None + data = dict(item) + data.setdefault("user_id", ctx.user_id) + data.setdefault("agent_id", ctx.agent_id) + data.setdefault("workspace_id", ctx.workspace_id) + data.setdefault("session_id", session_id) + data.setdefault("namespace", default_namespace_for_context(ctx, Visibility.PRIVATE)) + data.setdefault("memory_type", MemoryType.SUMMARY.value) + data.setdefault("content", data.get("text") or data.get("summary") or "") + data.setdefault("summary", data.get("content", "")[:180]) + data.setdefault("tags", ["evermemos-external"]) + data.setdefault("importance", 0.7) + data.setdefault("confidence", 0.65) + data.setdefault("visibility", Visibility.PRIVATE.value) + data.setdefault("source", SourceType.EVERMEMOS.value) + if not data["content"]: + return None + return MemoryRecord.model_validate(data) + + def get_profile(self, user_id: str) -> ProfileRecord: + profile = self.repo.get_profile(user_id) + if not profile: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Profile not found") + return profile + + def add_feedback(self, memory_id: str, request: MemoryFeedbackRequest) -> dict: + ctx = AccessContext(**request.model_dump(include={"user_id", "agent_id", "workspace_id", "session_id"})) + memory = self.get_memory(memory_id, ctx) + self._audit( + f"feedback:{request.feedback}", + "memory", + memory.id, + namespace=memory.namespace, + actor_user_id=request.user_id, + actor_agent_id=request.agent_id, + metadata={"comment": request.comment}, + ) + return {"status": "ok", "memory_id": memory_id, "feedback": request.feedback} + + def list_namespaces(self, ctx: AccessContext) -> list[NamespaceInfo]: + return visible_namespaces(ctx) + + def list_audit(self, limit: int = 100) -> list[AuditLog]: + return self.repo.list_audit(limit) + + def _score(self, memory: MemoryRecord, query: str) -> float: + lexical = 1.0 if query and query in memory.content.lower() else 0.2 + return lexical + memory.importance + memory.confidence + + def _audit( + self, + action: str, + target_type: str, + target_id: str | None, + namespace: str | None = None, + actor_user_id: str | None = None, + actor_agent_id: str | None = None, + decision: str = "allow", + metadata: dict | None = None, + ) -> None: + self.repo.add_audit( + AuditLog( + actor_user_id=actor_user_id, + actor_agent_id=actor_agent_id, + action=action, + target_type=target_type, + target_id=target_id, + namespace=namespace, + decision=decision, # type: ignore[arg-type] + metadata=metadata or {}, + ) + ) + + +service = MemoryGatewayService() diff --git a/memory_gateway/skills/__init__.py b/memory_gateway/skills/__init__.py new file mode 100644 index 0000000..072e22b --- /dev/null +++ b/memory_gateway/skills/__init__.py @@ -0,0 +1,2 @@ +"""Skill skeletons for Memory Gateway processing units.""" + diff --git a/memory_gateway/skills/base.py b/memory_gateway/skills/base.py new file mode 100644 index 0000000..a028047 --- /dev/null +++ b/memory_gateway/skills/base.py @@ -0,0 +1,21 @@ +"""Shared skill contracts.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class SkillResult: + status: str + output: dict[str, Any] = field(default_factory=dict) + writes_long_term_memory: bool = False + + +class MemorySkill: + name = "memory_skill" + writes_long_term_memory = False + + async def run(self, payload: dict[str, Any]) -> SkillResult: + raise NotImplementedError + diff --git a/memory_gateway/skills/classify_memory_skill.py b/memory_gateway/skills/classify_memory_skill.py new file mode 100644 index 0000000..cbb7d74 --- /dev/null +++ b/memory_gateway/skills/classify_memory_skill.py @@ -0,0 +1,9 @@ +from .base import MemorySkill, SkillResult + + +class ClassifyMemorySkill(MemorySkill): + name = "classify_memory_skill" + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"memory_type": payload.get("memory_type", "fact"), "visibility": payload.get("visibility", "private")}) + diff --git a/memory_gateway/skills/commit_memory_skill.py b/memory_gateway/skills/commit_memory_skill.py new file mode 100644 index 0000000..13fb405 --- /dev/null +++ b/memory_gateway/skills/commit_memory_skill.py @@ -0,0 +1,10 @@ +from .base import MemorySkill, SkillResult + + +class CommitMemorySkill(MemorySkill): + name = "commit_memory_skill" + writes_long_term_memory = True + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"committed": payload}, writes_long_term_memory=True) + diff --git a/memory_gateway/skills/export_to_obsidian_skill.py b/memory_gateway/skills/export_to_obsidian_skill.py new file mode 100644 index 0000000..cbd2204 --- /dev/null +++ b/memory_gateway/skills/export_to_obsidian_skill.py @@ -0,0 +1,9 @@ +from .base import MemorySkill, SkillResult + + +class ExportToObsidianSkill(MemorySkill): + name = "export_to_obsidian_skill" + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"draft_path": payload.get("draft_path")}) + diff --git a/memory_gateway/skills/extract_memory_skill.py b/memory_gateway/skills/extract_memory_skill.py new file mode 100644 index 0000000..2a0692c --- /dev/null +++ b/memory_gateway/skills/extract_memory_skill.py @@ -0,0 +1,11 @@ +from .base import MemorySkill, SkillResult + + +class ExtractMemorySkill(MemorySkill): + name = "extract_memory_skill" + + async def run(self, payload: dict) -> SkillResult: + text = payload.get("content", "") + candidates = [{"content": text, "confidence": 0.5}] if text else [] + return SkillResult(status="ok", output={"candidates": candidates}) + diff --git a/memory_gateway/skills/import_from_obsidian_skill.py b/memory_gateway/skills/import_from_obsidian_skill.py new file mode 100644 index 0000000..c30e80b --- /dev/null +++ b/memory_gateway/skills/import_from_obsidian_skill.py @@ -0,0 +1,10 @@ +from .base import MemorySkill, SkillResult + + +class ImportFromObsidianSkill(MemorySkill): + name = "import_from_obsidian_skill" + writes_long_term_memory = True + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"imported_path": payload.get("path")}, writes_long_term_memory=True) + diff --git a/memory_gateway/skills/ingest_skill.py b/memory_gateway/skills/ingest_skill.py new file mode 100644 index 0000000..58dabdb --- /dev/null +++ b/memory_gateway/skills/ingest_skill.py @@ -0,0 +1,9 @@ +from .base import MemorySkill, SkillResult + + +class IngestSkill(MemorySkill): + name = "ingest_skill" + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"normalized": payload}) + diff --git a/memory_gateway/skills/merge_memory_skill.py b/memory_gateway/skills/merge_memory_skill.py new file mode 100644 index 0000000..3199322 --- /dev/null +++ b/memory_gateway/skills/merge_memory_skill.py @@ -0,0 +1,10 @@ +from .base import MemorySkill, SkillResult + + +class MergeMemorySkill(MemorySkill): + name = "merge_memory_skill" + writes_long_term_memory = True + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"merged": payload.get("memory_ids", [])}, writes_long_term_memory=True) + diff --git a/memory_gateway/skills/prune_memory_skill.py b/memory_gateway/skills/prune_memory_skill.py new file mode 100644 index 0000000..0554b74 --- /dev/null +++ b/memory_gateway/skills/prune_memory_skill.py @@ -0,0 +1,10 @@ +from .base import MemorySkill, SkillResult + + +class PruneMemorySkill(MemorySkill): + name = "prune_memory_skill" + writes_long_term_memory = True + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"pruned": payload.get("memory_ids", [])}, writes_long_term_memory=True) + diff --git a/memory_gateway/skills/retrieve_context_skill.py b/memory_gateway/skills/retrieve_context_skill.py new file mode 100644 index 0000000..5e3dcc0 --- /dev/null +++ b/memory_gateway/skills/retrieve_context_skill.py @@ -0,0 +1,9 @@ +from .base import MemorySkill, SkillResult + + +class RetrieveContextSkill(MemorySkill): + name = "retrieve_context_skill" + + async def run(self, payload: dict) -> SkillResult: + return SkillResult(status="ok", output={"query": payload.get("query"), "contexts": []}) + diff --git a/memory_gateway/skills/summarize_episode_skill.py b/memory_gateway/skills/summarize_episode_skill.py new file mode 100644 index 0000000..d6e0ad0 --- /dev/null +++ b/memory_gateway/skills/summarize_episode_skill.py @@ -0,0 +1,10 @@ +from .base import MemorySkill, SkillResult + + +class SummarizeEpisodeSkill(MemorySkill): + name = "summarize_episode_skill" + + async def run(self, payload: dict) -> SkillResult: + content = payload.get("content", "") + return SkillResult(status="ok", output={"summary": content[:500]}) + diff --git a/memory_gateway/types.py b/memory_gateway/types.py index 3dc512b..33dbeb8 100644 --- a/memory_gateway/types.py +++ b/memory_gateway/types.py @@ -17,6 +17,17 @@ class OpenVikingConfig(BaseModel): timeout: int = 30 +class EverMemOSConfig(BaseModel): + """External EverMemOS consolidation service configuration.""" + enabled: bool = True + url: str = "http://127.0.0.1:1995" + api_key: str = "" + timeout: int = 30 + health_path: str = "/health" + consolidate_path: str = "/v1/sessions/consolidate" + fallback_to_local: bool = True + + class MemoryConfig(BaseModel): """记忆配置""" default_namespace: str = "memory-gateway" @@ -36,6 +47,13 @@ class ObsidianConfig(BaseModel): """Obsidian Vault 配置。""" vault_path: str = "/home/tom/memory-gateway/obsidian-vault" knowledge_dir: str = "01_Knowledge/Uploaded" + review_dir: str = "Reviews/Queue" + + +class StorageConfig(BaseModel): + """Metadata storage configuration.""" + backend: Literal["sqlite", "memory"] = "sqlite" + sqlite_path: str = "/home/tom/memory-gateway/memory_gateway.sqlite3" class LoggingConfig(BaseModel): @@ -48,10 +66,12 @@ class Config(BaseModel): """完整配置""" server: ServerConfig = Field(default_factory=ServerConfig) openviking: OpenVikingConfig = Field(default_factory=OpenVikingConfig) + evermemos: EverMemOSConfig = Field(default_factory=EverMemOSConfig) memory: MemoryConfig = Field(default_factory=MemoryConfig) logging: LoggingConfig = Field(default_factory=LoggingConfig) llm: LLMConfig = Field(default_factory=LLMConfig) obsidian: ObsidianConfig = Field(default_factory=ObsidianConfig) + storage: StorageConfig = Field(default_factory=StorageConfig) class SearchRequest(BaseModel): diff --git a/memory_gateway/workers/__init__.py b/memory_gateway/workers/__init__.py new file mode 100644 index 0000000..be8d2d5 --- /dev/null +++ b/memory_gateway/workers/__init__.py @@ -0,0 +1,2 @@ +"""Background worker skeletons.""" + diff --git a/memory_gateway/workers/evermemos_worker.py b/memory_gateway/workers/evermemos_worker.py new file mode 100644 index 0000000..3b1ef08 --- /dev/null +++ b/memory_gateway/workers/evermemos_worker.py @@ -0,0 +1,186 @@ +"""Minimal EverMemOS-style consolidation worker. + +This worker is deliberately deterministic for the POC. It extracts stable +candidate memories from session episodes, deduplicates them against existing +records, promotes eligible records, and sends high-risk/high-value candidates +to Obsidian review rather than blindly polluting long-term memory. +""" +from __future__ import annotations + +import hashlib +import re +from dataclasses import dataclass, field + +from memory_gateway.namespace import default_namespace_for_context +from memory_gateway.obsidian_review import write_review_draft +from memory_gateway.repositories import MetadataRepository +from memory_gateway.schemas import ( + AccessContext, + EpisodeRecord, + MemoryRecord, + MemoryType, + SourceType, + Visibility, +) + + +_SENTENCE_RE = re.compile(r"(?<=[。!?.!?])\s+|\n+") +_NOISE_RE = re.compile(r"\s+") + + +@dataclass +class ConsolidationResult: + session_id: str + episodes: int + candidates: list[MemoryRecord] = field(default_factory=list) + promoted: list[MemoryRecord] = field(default_factory=list) + duplicates: list[dict] = field(default_factory=list) + review_drafts: list[str] = field(default_factory=list) + conflicts: list[dict] = field(default_factory=list) + + +class EverMemOSWorker: + def __init__(self, repo: MetadataRepository) -> None: + self.repo = repo + + def consolidate_session( + self, + session_id: str, + ctx: AccessContext, + min_importance: float = 0.6, + target_namespace: str | None = None, + ) -> ConsolidationResult: + episodes = self.repo.list_session_episodes(session_id) + result = ConsolidationResult(session_id=session_id, episodes=len(episodes)) + existing = list(self.repo.list_memories()) + seen_fingerprints = {self._fingerprint(memory.content): memory for memory in existing} + + for episode in episodes: + for candidate in self._extract_candidates(episode, ctx, min_importance, target_namespace): + result.candidates.append(candidate) + fingerprint = self._fingerprint(candidate.content) + duplicate = seen_fingerprints.get(fingerprint) + if duplicate: + result.duplicates.append({"candidate_id": candidate.id, "existing_id": duplicate.id}) + continue + + conflict_ids = self._find_conflicts(candidate, existing) + if conflict_ids: + draft = write_review_draft(candidate, reason="conflict", conflict_ids=conflict_ids) + result.review_drafts.append(str(draft)) + result.conflicts.append({"candidate_id": candidate.id, "conflict_ids": conflict_ids}) + continue + + if candidate.importance >= 0.85: + draft = write_review_draft(candidate, reason="high_value") + result.review_drafts.append(str(draft)) + continue + + if candidate.importance >= min_importance and candidate.confidence >= 0.55: + self.repo.upsert_memory(candidate) + result.promoted.append(candidate) + seen_fingerprints[fingerprint] = candidate + existing.append(candidate) + + return result + + def _extract_candidates( + self, + episode: EpisodeRecord, + ctx: AccessContext, + min_importance: float, + target_namespace: str | None, + ) -> list[MemoryRecord]: + text = episode.summary or episode.content + parts = [self._normalize(part) for part in _SENTENCE_RE.split(text) if self._normalize(part)] + candidates: list[MemoryRecord] = [] + for part in parts: + if len(part) < 20: + continue + memory_type = self._classify_type(part, episode.tags) + importance = self._estimate_importance(part, episode.tags, min_importance) + confidence = 0.65 if episode.summary else 0.58 + visibility = Visibility.WORKSPACE_SHARED if "workspace" in episode.tags and ctx.workspace_id else Visibility.PRIVATE + memory_ctx = AccessContext( + user_id=ctx.user_id, + agent_id=ctx.agent_id, + workspace_id=ctx.workspace_id, + session_id=ctx.session_id, + ) + candidates.append( + MemoryRecord( + user_id=ctx.user_id, + agent_id=ctx.agent_id, + workspace_id=ctx.workspace_id, + session_id=episode.session_id, + namespace=target_namespace or default_namespace_for_context(memory_ctx, visibility), + memory_type=memory_type, + content=part, + summary=part[:180], + tags=list(set(episode.tags + ["promoted-from-session", "evermemos-candidate"])), + importance=importance, + confidence=confidence, + visibility=visibility, + source=SourceType.EVERMEMOS, + source_ref=episode.id, + ) + ) + return candidates + + def _classify_type(self, text: str, tags: list[str]) -> MemoryType: + lowered = text.lower() + if "preference" in tags or "偏好" in text: + return MemoryType.PREFERENCE + if "decision" in tags or "决定" in text or "决策" in text: + return MemoryType.DECISION + if "procedure" in tags or "步骤" in text or "流程" in text: + return MemoryType.PROCEDURE + if "经验" in text or "worked" in lowered or "failed" in lowered: + return MemoryType.EXPERIENCE + return MemoryType.SUMMARY + + def _estimate_importance(self, text: str, tags: list[str], min_importance: float) -> float: + importance = max(min_importance, 0.6) + signal_words = ["必须", "不要", "偏好", "长期", "决策", "结论", "重要", "preference", "decision", "must"] + if any(word in text.lower() for word in signal_words): + importance += 0.15 + if "review" in tags or "high-value" in tags: + importance += 0.2 + return min(1.0, importance) + + def _find_conflicts(self, candidate: MemoryRecord, existing: list[MemoryRecord]) -> list[str]: + candidate_text = candidate.content.lower() + negation_signals = ["不要", "不再", "禁止", "not ", "never", "disable"] + positive_signals = ["需要", "必须", "启用", "prefer", "always", "enable"] + has_negative = any(signal in candidate_text for signal in negation_signals) + has_positive = any(signal in candidate_text for signal in positive_signals) + if not has_negative and not has_positive: + return [] + + candidate_tokens = self._tokens(candidate.content) + conflicts = [] + for memory in existing: + if memory.user_id != candidate.user_id: + continue + if memory.memory_type != candidate.memory_type: + continue + overlap = candidate_tokens.intersection(self._tokens(memory.content)) + if len(overlap) < 2: + continue + memory_text = memory.content.lower() + memory_negative = any(signal in memory_text for signal in negation_signals) + memory_positive = any(signal in memory_text for signal in positive_signals) + if has_negative != memory_negative or has_positive != memory_positive: + conflicts.append(memory.id) + return conflicts + + def _tokens(self, text: str) -> set[str]: + return {token for token in re.split(r"[^a-zA-Z0-9\u4e00-\u9fff]+", text.lower()) if len(token) >= 2} + + def _normalize(self, text: str) -> str: + return _NOISE_RE.sub(" ", text).strip(" -_*#\t") + + def _fingerprint(self, text: str) -> str: + normalized = self._normalize(text).lower() + return hashlib.sha1(normalized.encode("utf-8")).hexdigest() + diff --git a/obsidian-vault/05_Templates/agent-experience-template.md b/obsidian-vault/05_Templates/agent-experience-template.md new file mode 100644 index 0000000..291b797 --- /dev/null +++ b/obsidian-vault/05_Templates/agent-experience-template.md @@ -0,0 +1,22 @@ +--- +type: agent_experience +agent_id: +visibility: agent-only +tags: + - memory/agent-experience +--- + +# Agent Experience - {{agent_id}} + +## What Worked + +- + +## What Failed + +- + +## Tooling Notes + +- + diff --git a/obsidian-vault/05_Templates/long-term-memory-template.md b/obsidian-vault/05_Templates/long-term-memory-template.md new file mode 100644 index 0000000..6d5636a --- /dev/null +++ b/obsidian-vault/05_Templates/long-term-memory-template.md @@ -0,0 +1,33 @@ +--- +type: long_term_memory +memory_id: +user_id: +workspace_id: +visibility: private +importance: +confidence: +source: +tags: + - memory/long-term +--- + +# {{summary}} + +## Memory + + +## Context + + +## Evidence + +- Source: +- Created: +- Version: + +## Review + +- Status: pending +- Reviewer: +- Decision: + diff --git a/obsidian-vault/05_Templates/review-queue-template.md b/obsidian-vault/05_Templates/review-queue-template.md new file mode 100644 index 0000000..c02ec6c --- /dev/null +++ b/obsidian-vault/05_Templates/review-queue-template.md @@ -0,0 +1,23 @@ +--- +type: memory_review +review_status: pending +tags: + - memory/review +--- + +# Memory Review - {{memory_id}} + +## Candidate + + +## Proposed Action + +- [ ] Accept +- [ ] Edit +- [ ] Reject +- [ ] Merge +- [ ] Archive + +## Reason + + diff --git a/obsidian-vault/05_Templates/user-profile-template.md b/obsidian-vault/05_Templates/user-profile-template.md new file mode 100644 index 0000000..fe30931 --- /dev/null +++ b/obsidian-vault/05_Templates/user-profile-template.md @@ -0,0 +1,28 @@ +--- +type: user_profile +user_id: +visibility: private +tags: + - memory/profile + - visibility/private +--- + +# User Profile - {{user_id}} + +## Stable Facts + +- + +## Preferences + +- + +## Working Style + +- + +## Evidence + +| Memory ID | Evidence | Confidence | Updated | +|---|---|---:|---| + diff --git a/obsidian-vault/05_Templates/workspace-memory-template.md b/obsidian-vault/05_Templates/workspace-memory-template.md new file mode 100644 index 0000000..3aa52d1 --- /dev/null +++ b/obsidian-vault/05_Templates/workspace-memory-template.md @@ -0,0 +1,23 @@ +--- +type: workspace_memory +workspace_id: +visibility: workspace-shared +tags: + - memory/workspace + - visibility/workspace-shared +--- + +# Workspace Memory - {{workspace_id}} + +## Shared Decisions + +- + +## Project Knowledge + +- + +## Reusable Context + +- + diff --git a/tests/test_evermemos_service.py b/tests/test_evermemos_service.py new file mode 100644 index 0000000..2db3a0c --- /dev/null +++ b/tests/test_evermemos_service.py @@ -0,0 +1,53 @@ +import asyncio + +from memory_gateway.evermemos_service import ConsolidateRequest, consolidate_session + + +def test_evermemos_service_consolidates_session(monkeypatch, tmp_path): + monkeypatch.setattr( + "memory_gateway.obsidian_review.get_config", + lambda: type( + "Config", + (), + { + "obsidian": type( + "Obsidian", + (), + {"vault_path": str(tmp_path / "vault"), "review_dir": "Reviews/Queue"}, + )() + }, + )(), + ) + payload = { + "session_id": "sess_service", + "context": {"user_id": "user_a", "agent_id": "agent_a", "workspace_id": "ws_a", "session_id": "sess_service"}, + "episodes": [ + { + "user_id": "user_a", + "agent_id": "agent_a", + "workspace_id": "ws_a", + "session_id": "sess_service", + "namespace": "session/sess_service/episodic", + "content": "结论:EverMemOS 本地服务负责整理稳定长期记忆。", + "tags": ["decision"], + }, + { + "user_id": "user_a", + "agent_id": "agent_a", + "workspace_id": "ws_a", + "session_id": "sess_service", + "namespace": "session/sess_service/episodic", + "content": "重要:高价值记忆应该进入 Obsidian review queue。", + "tags": ["review", "high-value"], + }, + ], + } + + response = asyncio.run(consolidate_session(ConsolidateRequest.model_validate(payload))) + + assert response["status"] == "ok" + result = response["result"] + assert result["episodes"] == 2 + assert len(result["candidates"]) == 2 + assert len(result["promoted"]) == 1 + assert len(result["review_drafts"]) == 1 diff --git a/tests/test_server.py b/tests/test_server.py index 508ab8a..8a5f5c7 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -160,8 +160,9 @@ def test_mcp_rpc_lists_tools_with_api_key(monkeypatch): assert response.status_code == 200 payload = response.json() assert payload["jsonrpc"] == "2.0" - assert len(payload["result"]["tools"]) == 7 + assert len(payload["result"]["tools"]) >= 7 assert any(tool["name"] == "commit_summary" for tool in payload["result"]["tools"]) + assert any(tool["name"] == "memory_search" for tool in payload["result"]["tools"]) def test_search_passes_through_gateway(monkeypatch): diff --git a/tests/test_v1_mcp.py b/tests/test_v1_mcp.py new file mode 100644 index 0000000..db735a1 --- /dev/null +++ b/tests/test_v1_mcp.py @@ -0,0 +1,29 @@ +import asyncio + +from memory_gateway.repositories import InMemoryRepository +from memory_gateway.services import MemoryGatewayService + + +def test_v1_mcp_tools_are_exposed_and_dispatch(monkeypatch): + import memory_gateway.server as server + + service = MemoryGatewayService(InMemoryRepository()) + monkeypatch.setattr(server, "v1_service", service) + + tools = asyncio.run(server.list_tools()) + assert any(tool.name == "memory_search" for tool in tools) + assert any(tool.name == "memory_commit_session" for tool in tools) + + result = asyncio.run( + server.call_v1_memory_tool( + "memory_upsert", + { + "user_id": "user_a", + "content": "MCP 写入的 v1 memory", + "visibility": "private", + }, + ) + ) + + assert result["user_id"] == "user_a" + assert result["namespace"] == "user/user_a/long_term" diff --git a/tests/test_v1_service.py b/tests/test_v1_service.py new file mode 100644 index 0000000..f2d2018 --- /dev/null +++ b/tests/test_v1_service.py @@ -0,0 +1,185 @@ +import asyncio + +from memory_gateway.repositories import InMemoryRepository, SQLiteRepository +from memory_gateway.schemas import ( + AccessContext, + CommitSessionRequest, + CreateUserRequest, + EpisodeAppendRequest, + MemorySearchRequest, + MemoryUpsertRequest, + Visibility, +) +from memory_gateway.services import MemoryGatewayService +from memory_gateway.types import Config, EverMemOSConfig, ObsidianConfig + + +def test_private_memory_is_isolated_by_user(): + service = MemoryGatewayService(InMemoryRepository()) + service.create_user(CreateUserRequest(user_id="user_a", display_name="A")) + service.create_user(CreateUserRequest(user_id="user_b", display_name="B")) + + memory = service.upsert_memory( + MemoryUpsertRequest( + user_id="user_a", + content="用户 A 的私有偏好是中文输出", + visibility=Visibility.PRIVATE, + ) + ) + + own_results = service.search_memory(MemorySearchRequest(user_id="user_a", query="中文")) + other_results = service.search_memory(MemorySearchRequest(user_id="user_b", query="中文")) + + assert own_results["total"] == 1 + assert own_results["results"][0]["memory"].id == memory.id + assert other_results["total"] == 0 + + +def test_workspace_memory_requires_matching_workspace(): + service = MemoryGatewayService(InMemoryRepository()) + memory = service.upsert_memory( + MemoryUpsertRequest( + user_id="user_a", + workspace_id="ws_1", + content="workspace 共享的项目决策", + visibility=Visibility.WORKSPACE_SHARED, + ) + ) + + visible = service.get_memory(memory.id, AccessContext(user_id="user_b", workspace_id="ws_1")) + assert visible.id == memory.id + + hidden = service.search_memory(MemorySearchRequest(user_id="user_b", workspace_id="ws_2", query="项目决策")) + assert hidden["total"] == 0 + + +def test_sqlite_repository_persists_memory(tmp_path): + db_path = tmp_path / "memory_gateway.sqlite3" + repo = SQLiteRepository(db_path) + service = MemoryGatewayService(repo) + + service.create_user(CreateUserRequest(user_id="user_a", display_name="A")) + memory = service.upsert_memory(MemoryUpsertRequest(user_id="user_a", content="持久化 SQLite memory")) + + reloaded_service = MemoryGatewayService(SQLiteRepository(db_path)) + reloaded = reloaded_service.get_memory(memory.id, AccessContext(user_id="user_a")) + + assert reloaded.content == "持久化 SQLite memory" + + +def test_commit_session_promotes_dedupes_and_creates_review_draft(monkeypatch, tmp_path): + monkeypatch.setattr( + "memory_gateway.services.get_config", + lambda: Config(evermemos=EverMemOSConfig(enabled=False)), + ) + monkeypatch.setattr( + "memory_gateway.obsidian_review.get_config", + lambda: Config(obsidian=ObsidianConfig(vault_path=str(tmp_path / "vault"), review_dir="Reviews/Queue")), + ) + service = MemoryGatewayService(InMemoryRepository()) + service.append_episode( + EpisodeAppendRequest( + user_id="user_a", + session_id="sess_1", + content="结论:这个项目必须保留用户隔离和 namespace ACL。", + tags=["decision"], + ) + ) + service.append_episode( + EpisodeAppendRequest( + user_id="user_a", + session_id="sess_1", + content="重要:这条高价值记忆需要人工 review 后再进入长期记忆。", + tags=["review", "high-value"], + ) + ) + + result = service.commit_session( + "sess_1", + CommitSessionRequest( + user_id="user_a", + session_id="sess_1", + min_importance=0.6, + ), + ) + + assert len(result["promoted"]) == 1 + assert result["evermemos_backend"] == "local-disabled" + assert len(result["review_drafts"]) == 1 + assert (tmp_path / "vault" / "Reviews" / "Queue").exists() + + +def test_commit_session_uses_external_evermemos(monkeypatch): + monkeypatch.setattr( + "memory_gateway.services.get_config", + lambda: Config(evermemos=EverMemOSConfig(enabled=True, fallback_to_local=False)), + ) + + class FakeEverMemOSClient: + def consolidate_session(self, **kwargs): + return { + "episodes": 1, + "candidates": [], + "promoted": [ + { + "content": "外部 EverMemOS 总结出的长期记忆", + "summary": "外部 EverMemOS 长期记忆", + "memory_type": "summary", + "tags": ["external-evermemos"], + } + ], + "duplicates": [], + "conflicts": [], + "review_drafts": [], + } + + def health(self): + return {"status": "ok"} + + service = MemoryGatewayService(InMemoryRepository(), evermemos_client=FakeEverMemOSClient()) + service.append_episode( + EpisodeAppendRequest( + user_id="user_a", + session_id="sess_external", + content="这条 episode 应该交给外部 EverMemOS。", + ) + ) + result = service.commit_session( + "sess_external", + CommitSessionRequest(user_id="user_a", session_id="sess_external"), + ) + + assert result["evermemos_backend"] == "external" + assert len(result["promoted"]) == 1 + search = service.search_memory(MemorySearchRequest(user_id="user_a", query="外部 EverMemOS")) + assert search["total"] == 1 + + +def test_search_fans_out_to_openviking_after_namespace_acl(monkeypatch): + service = MemoryGatewayService(InMemoryRepository()) + + class FakeSearchResult: + results = [{"uri": "viking://user/user_a/long_term/demo", "abstract": "OpenViking result", "score": 0.9}] + + class FakeOpenVikingClient: + async def search(self, query, namespace=None, limit=None, uri=None): + assert namespace == "user/user_a/long_term" + return FakeSearchResult() + + async def fake_get_openviking_client(): + return FakeOpenVikingClient() + + monkeypatch.setattr("memory_gateway.services.get_openviking_client", fake_get_openviking_client) + + result = asyncio.run( + service.search_memory_with_openviking( + MemorySearchRequest( + user_id="user_a", + query="demo", + namespaces=["user/user_a/long_term", "user/user_b/long_term"], + ) + ) + ) + + assert result["openviking_total"] == 1 + assert result["searched_namespaces"] == ["user/user_a/long_term"]