更新文档,调整内存 URI 格式和写入模式示例;移除过时的 OpenViking 内容补充逻辑,并添加相应的单元测试

This commit is contained in:
2026-06-04 09:34:37 +08:00
parent 8c479e4ecd
commit 5500947ddb
4 changed files with 47 additions and 112 deletions

View File

@ -420,6 +420,29 @@ def test_write_memory_delegates_to_openviking_content_write_only():
assert everos.calls == []
def test_write_memory_supports_openviking_create_mode():
openviking = FakeOpenViking()
everos = FakeEverOS()
service = MemorySystemService(openviking=openviking, everos=everos)
response = asyncio.run(service.write_memory(MemoryWriteRequest(
user_id="tom",
user_key="tom-key",
uri="viking://user/tom/memories/preferences/饮食偏好.md",
content="用户喜欢喝茶",
mode="create",
wait=True,
)))
assert response.status == "success"
assert response.memory == {"status": "ok", "result": {"uri": "viking://user/tom/memories/preferences/饮食偏好.md", "mode": "create"}}
assert openviking.calls == [
("credential_for_user", "tom", "tom-key", None),
("write_memory", "key-tom", "viking://user/tom/memories/preferences/饮食偏好.md", "用户喜欢喝茶", "create", True),
]
assert everos.calls == []
def test_delete_memory_delegates_to_openviking_only_and_defaults_non_recursive():
openviking = FakeOpenViking()
everos = FakeEverOS()
@ -488,59 +511,6 @@ def test_search_returns_compact_items_and_backend_diagnostics_without_duplicate_
assert not _has_key(response.backends["everos"].result, "original_data")
def test_search_includes_latest_openviking_memory_content_for_memory_hits():
class FakeOpenVikingWithStaleSearch(FakeOpenViking):
async def search(
self,
user_key: str,
query: str,
limit: int,
level: int = 2,
score_threshold: float = 0.8,
target_uri: str = "viking://user/memories",
) -> dict:
self.calls.append(("search", user_key, query, limit, level, score_threshold, target_uri))
return {
"status": "ok",
"result": {
"memories": [
{
"context_type": "memory",
"uri": "viking://user/tom/memories/preferences/drink.md",
"level": 2,
"score": 0.91,
"abstract": "用户喜欢拿铁咖啡。",
}
],
"resources": [],
"skills": [],
"total": 1,
},
}
async def read_memory(self, user_key: str, uri: str) -> dict:
self.calls.append(("read_memory", user_key, uri))
return {"status": "ok", "result": {"uri": uri, "content": "用户不喜欢咖啡,只喜欢下午喝奶茶"}}
openviking = FakeOpenVikingWithStaleSearch()
service = MemorySystemService(openviking=openviking, everos=FakeEverOS())
response = asyncio.run(service.search(
SearchRequest(user_id="tom", user_key="tom-key", session_id="sess-1", query="我喜欢喝什么?", limit=5),
))
assert response.items[0] == {
"source_backend": "openviking",
"context_type": "memory",
"uri": "viking://user/tom/memories/preferences/drink.md",
"level": 2,
"score": 0.91,
"abstract": "用户喜欢拿铁咖啡。",
"content": "用户不喜欢咖啡,只喜欢下午喝奶茶",
}
assert ("read_memory", "key-tom", "viking://user/tom/memories/preferences/drink.md") in openviking.calls
def test_session_context_combines_openviking_context_and_everos_search_items():
openviking = FakeOpenViking()
everos = FakeEverOSVerbose()