54 lines
1.8 KiB
Python
54 lines
1.8 KiB
Python
import asyncio
|
||
|
||
from memory_gateway.evermemos_service import ConsolidateRequest, consolidate_session
|
||
|
||
|
||
def test_evermemos_service_consolidates_session(monkeypatch, tmp_path):
|
||
monkeypatch.setattr(
|
||
"memory_gateway.obsidian_review.get_config",
|
||
lambda: type(
|
||
"Config",
|
||
(),
|
||
{
|
||
"obsidian": type(
|
||
"Obsidian",
|
||
(),
|
||
{"vault_path": str(tmp_path / "vault"), "review_dir": "Reviews/Queue"},
|
||
)()
|
||
},
|
||
)(),
|
||
)
|
||
payload = {
|
||
"session_id": "sess_service",
|
||
"context": {"user_id": "user_a", "agent_id": "agent_a", "workspace_id": "ws_a", "session_id": "sess_service"},
|
||
"episodes": [
|
||
{
|
||
"user_id": "user_a",
|
||
"agent_id": "agent_a",
|
||
"workspace_id": "ws_a",
|
||
"session_id": "sess_service",
|
||
"namespace": "session/sess_service/episodic",
|
||
"content": "结论:EverMemOS 本地服务负责整理稳定长期记忆。",
|
||
"tags": ["decision"],
|
||
},
|
||
{
|
||
"user_id": "user_a",
|
||
"agent_id": "agent_a",
|
||
"workspace_id": "ws_a",
|
||
"session_id": "sess_service",
|
||
"namespace": "session/sess_service/episodic",
|
||
"content": "重要:高价值记忆应该进入 Obsidian review queue。",
|
||
"tags": ["review", "high-value"],
|
||
},
|
||
],
|
||
}
|
||
|
||
response = asyncio.run(consolidate_session(ConsolidateRequest.model_validate(payload)))
|
||
|
||
assert response["status"] == "ok"
|
||
result = response["result"]
|
||
assert result["episodes"] == 2
|
||
assert len(result["candidates"]) == 2
|
||
assert len(result["promoted"]) == 1
|
||
assert len(result["review_drafts"]) == 1
|