Files
memory-gateway/tests/test_v1_service.py
2026-05-05 16:18:31 +08:00

186 lines
6.4 KiB
Python

import asyncio
from memory_gateway.repositories import InMemoryRepository, SQLiteRepository
from memory_gateway.schemas import (
AccessContext,
CommitSessionRequest,
CreateUserRequest,
EpisodeAppendRequest,
MemorySearchRequest,
MemoryUpsertRequest,
Visibility,
)
from memory_gateway.services import MemoryGatewayService
from memory_gateway.types import Config, EverMemOSConfig, ObsidianConfig
def test_private_memory_is_isolated_by_user():
service = MemoryGatewayService(InMemoryRepository())
service.create_user(CreateUserRequest(user_id="user_a", display_name="A"))
service.create_user(CreateUserRequest(user_id="user_b", display_name="B"))
memory = service.upsert_memory(
MemoryUpsertRequest(
user_id="user_a",
content="用户 A 的私有偏好是中文输出",
visibility=Visibility.PRIVATE,
)
)
own_results = service.search_memory(MemorySearchRequest(user_id="user_a", query="中文"))
other_results = service.search_memory(MemorySearchRequest(user_id="user_b", query="中文"))
assert own_results["total"] == 1
assert own_results["results"][0]["memory"].id == memory.id
assert other_results["total"] == 0
def test_workspace_memory_requires_matching_workspace():
service = MemoryGatewayService(InMemoryRepository())
memory = service.upsert_memory(
MemoryUpsertRequest(
user_id="user_a",
workspace_id="ws_1",
content="workspace 共享的项目决策",
visibility=Visibility.WORKSPACE_SHARED,
)
)
visible = service.get_memory(memory.id, AccessContext(user_id="user_b", workspace_id="ws_1"))
assert visible.id == memory.id
hidden = service.search_memory(MemorySearchRequest(user_id="user_b", workspace_id="ws_2", query="项目决策"))
assert hidden["total"] == 0
def test_sqlite_repository_persists_memory(tmp_path):
db_path = tmp_path / "memory_gateway.sqlite3"
repo = SQLiteRepository(db_path)
service = MemoryGatewayService(repo)
service.create_user(CreateUserRequest(user_id="user_a", display_name="A"))
memory = service.upsert_memory(MemoryUpsertRequest(user_id="user_a", content="持久化 SQLite memory"))
reloaded_service = MemoryGatewayService(SQLiteRepository(db_path))
reloaded = reloaded_service.get_memory(memory.id, AccessContext(user_id="user_a"))
assert reloaded.content == "持久化 SQLite memory"
def test_commit_session_promotes_dedupes_and_creates_review_draft(monkeypatch, tmp_path):
monkeypatch.setattr(
"memory_gateway.services.get_config",
lambda: Config(evermemos=EverMemOSConfig(enabled=False)),
)
monkeypatch.setattr(
"memory_gateway.obsidian_review.get_config",
lambda: Config(obsidian=ObsidianConfig(vault_path=str(tmp_path / "vault"), review_dir="Reviews/Queue")),
)
service = MemoryGatewayService(InMemoryRepository())
service.append_episode(
EpisodeAppendRequest(
user_id="user_a",
session_id="sess_1",
content="结论:这个项目必须保留用户隔离和 namespace ACL。",
tags=["decision"],
)
)
service.append_episode(
EpisodeAppendRequest(
user_id="user_a",
session_id="sess_1",
content="重要:这条高价值记忆需要人工 review 后再进入长期记忆。",
tags=["review", "high-value"],
)
)
result = service.commit_session(
"sess_1",
CommitSessionRequest(
user_id="user_a",
session_id="sess_1",
min_importance=0.6,
),
)
assert len(result["promoted"]) == 1
assert result["evermemos_backend"] == "local-disabled"
assert len(result["review_drafts"]) == 1
assert (tmp_path / "vault" / "Reviews" / "Queue").exists()
def test_commit_session_uses_external_evermemos(monkeypatch):
monkeypatch.setattr(
"memory_gateway.services.get_config",
lambda: Config(evermemos=EverMemOSConfig(enabled=True, fallback_to_local=False)),
)
class FakeEverMemOSClient:
def consolidate_session(self, **kwargs):
return {
"episodes": 1,
"candidates": [],
"promoted": [
{
"content": "外部 EverMemOS 总结出的长期记忆",
"summary": "外部 EverMemOS 长期记忆",
"memory_type": "summary",
"tags": ["external-evermemos"],
}
],
"duplicates": [],
"conflicts": [],
"review_drafts": [],
}
def health(self):
return {"status": "ok"}
service = MemoryGatewayService(InMemoryRepository(), evermemos_client=FakeEverMemOSClient())
service.append_episode(
EpisodeAppendRequest(
user_id="user_a",
session_id="sess_external",
content="这条 episode 应该交给外部 EverMemOS。",
)
)
result = service.commit_session(
"sess_external",
CommitSessionRequest(user_id="user_a", session_id="sess_external"),
)
assert result["evermemos_backend"] == "external"
assert len(result["promoted"]) == 1
search = service.search_memory(MemorySearchRequest(user_id="user_a", query="外部 EverMemOS"))
assert search["total"] == 1
def test_search_fans_out_to_openviking_after_namespace_acl(monkeypatch):
service = MemoryGatewayService(InMemoryRepository())
class FakeSearchResult:
results = [{"uri": "viking://user/user_a/long_term/demo", "abstract": "OpenViking result", "score": 0.9}]
class FakeOpenVikingClient:
async def search(self, query, namespace=None, limit=None, uri=None):
assert namespace == "user/user_a/long_term"
return FakeSearchResult()
async def fake_get_openviking_client():
return FakeOpenVikingClient()
monkeypatch.setattr("memory_gateway.services.get_openviking_client", fake_get_openviking_client)
result = asyncio.run(
service.search_memory_with_openviking(
MemorySearchRequest(
user_id="user_a",
query="demo",
namespaces=["user/user_a/long_term", "user/user_b/long_term"],
)
)
)
assert result["openviking_total"] == 1
assert result["searched_namespaces"] == ["user/user_a/long_term"]