import asyncio from memory_system_api.clients import EverOSMemorySystemClient, OpenVikingMemorySystemClient class FakeStore: def __init__(self): self.users = {} self.accounts = {} self.sessions = [] self.tasks = [] def get_user_key(self, user_id: str) -> str | None: return self.users.get(user_id) def save_user_key(self, user_id: str, user_key: str) -> None: self.users[user_id] = user_key def get_account_key(self, account_id: str) -> str | None: return self.accounts.get(account_id) def save_account_key(self, account_id: str, admin_user_id: str, account_key: str) -> None: self.accounts[account_id] = account_key def account_key_matches(self, account_id: str, account_key: str) -> bool: return self.accounts.get(account_id) == account_key def user_key_matches(self, user_id: str, user_key: str) -> bool: return self.users.get(user_id) == user_key def save_session(self, user_id: str, session_id: str) -> None: self.sessions.append((user_id, session_id)) def save_task(self, user_id: str, session_id: str, task_id: str, archive_uri: str | None) -> None: self.tasks.append((user_id, session_id, task_id, archive_uri)) class FakeResponse: def __init__(self, status_code: int, data: dict): self.status_code = status_code self._data = data def json(self) -> dict: return self._data def raise_for_status(self) -> None: if self.status_code >= 400: raise AssertionError(f"unexpected status {self.status_code}") class FakeAsyncClient: def __init__(self, calls: list, responses: list[FakeResponse], api_key: str, headers: dict): self.calls = calls self.responses = responses self.api_key = api_key self.headers = headers async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): return False async def post(self, path: str, json: dict | None = None) -> FakeResponse: self.calls.append(("post", self.api_key, self.headers, path, json)) return self.responses.pop(0) async def get(self, path: str) -> FakeResponse: self.calls.append(("get", self.api_key, self.headers, path, None)) return self.responses.pop(0) def test_openviking_rejects_unknown_user_credentials(): store = FakeStore() client = OpenVikingMemorySystemClient(store=store) try: client.credential_for_user("tom", "missing-key") except PermissionError as exc: assert "Invalid user key" in str(exc) else: raise AssertionError("expected PermissionError") def test_openviking_accepts_matching_user_credentials(): store = FakeStore() store.save_user_key("tom", "tom-key") client = OpenVikingMemorySystemClient(store=store) client.root_key = "root-key" credential = client.credential_for_user("tom", "tom-key", agent_id="sess-1") assert credential.api_key == "tom-key" assert credential.account_id == "admin" assert credential.user_id == "tom" assert credential.agent_id == "sess-1" def test_openviking_create_user_initializes_admin_workspace_first(): store = FakeStore() client = OpenVikingMemorySystemClient(store=store) client.root_key = "root-key" calls = [] responses = [ FakeResponse( 200, {"status": "ok", "result": {"account_id": "admin", "admin_user_id": "admin", "user_key": "admin-key"}}, ), FakeResponse( 200, {"status": "ok", "result": {"account_id": "admin", "user_id": "userA", "user_key": "userA-key"}}, ), ] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) result = asyncio.run(client.create_user("userA")) assert result == {"status": "ok", "result": {"account_id": "admin", "user_id": "userA", "user_key": "userA-key"}} assert store.accounts == {"admin": "admin-key"} assert store.users == {"admin": "admin-key", "userA": "userA-key"} assert calls == [ ( "post", "root-key", {}, "/api/v1/admin/accounts", {"account_id": "admin", "admin_user_id": "admin"}, ), ( "post", "root-key", {}, "/api/v1/admin/accounts/admin/users", {"user_id": "userA", "role": "user"}, ), ] def test_openviking_create_user_reuses_existing_admin_workspace(): store = FakeStore() store.save_account_key("admin", "admin", "admin-key") client = OpenVikingMemorySystemClient(store=store) client.root_key = "root-key" calls = [] responses = [ FakeResponse( 200, {"status": "ok", "result": {"account_id": "admin", "user_id": "userB", "user_key": "userB-key"}}, ) ] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) result = asyncio.run(client.create_user("userB")) assert result == {"status": "ok", "result": {"account_id": "admin", "user_id": "userB", "user_key": "userB-key"}} assert store.users == {"userB": "userB-key"} assert calls == [ ( "post", "root-key", {}, "/api/v1/admin/accounts/admin/users", {"user_id": "userB", "role": "user"}, ) ] def test_openviking_user_key_auth_is_used_for_session_create(): client = OpenVikingMemorySystemClient(store=FakeStore()) client.root_key = "root-key" calls = [] responses = [FakeResponse(200, {"status": "ok", "result": {"session_id": "sess-2"}})] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) credential = client.user_credential("tom-key", "tom", agent_id="sess-2") result = asyncio.run(client.ensure_session(credential, "sess-2")) assert result == {"status": "ok", "result": {"session_id": "sess-2"}} assert client.store.sessions == [("tom", "sess-2")] assert calls == [ ( "post", "tom-key", {}, "/api/v1/sessions", {"session_id": "sess-2"}, ) ] def test_openviking_find_uses_current_identity_memory_scope(): client = OpenVikingMemorySystemClient(store=FakeStore()) calls = [] responses = [FakeResponse(200, {"status": "ok", "result": {"memories": []}})] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) credential = client.user_credential("tom-key", "tom", agent_id="sess-1") result = asyncio.run(client.find(credential, "咖啡", 5)) assert result == {"status": "ok", "result": {"memories": []}} assert calls == [ ( "post", "tom-key", {}, "/api/v1/search/find", {"query": "咖啡", "target_uri": "viking://user/tom/memories/", "limit": 5}, ) ] def test_openviking_search_uses_session_target_uri(): client = OpenVikingMemorySystemClient(store=FakeStore()) calls = [] responses = [FakeResponse(200, {"status": "ok", "result": {"memories": []}})] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) credential = client.user_credential("tom-key", "tom", agent_id="sess-1") result = asyncio.run(client.search(credential, "sess-1", "咖啡", 5)) assert result == {"status": "ok", "result": {"memories": []}} assert calls == [ ( "post", "tom-key", {}, "/api/v1/search/search", {"query": "咖啡", "limit": 5, "session_id": "sess-1", "target_uri": "viking://user/tom/sess-1"}, ) ] def test_openviking_get_session_context_uses_user_key_auth(): client = OpenVikingMemorySystemClient(store=FakeStore()) calls = [] responses = [ FakeResponse( 200, { "status": "ok", "result": { "latest_archive_overview": "# Working Memory", "messages": [], }, }, ) ] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) credential = client.user_credential("tom-key", "tom", agent_id="sess-1") result = asyncio.run(client.get_session_context(credential, "sess-1")) assert result == { "status": "ok", "result": { "latest_archive_overview": "# Working Memory", "messages": [], }, } assert calls == [ ( "get", "tom-key", {}, "/api/v1/sessions/sess-1/context", None, ) ] def test_openviking_commit_keeps_no_recent_live_messages(): client = OpenVikingMemorySystemClient(store=FakeStore()) calls = [] responses = [ FakeResponse( 200, { "status": "ok", "result": { "status": "accepted", "task_id": "task-1", "archive_uri": "viking://session/tom/sess-1/history/archive_001", }, }, ) ] client._client = lambda api_key, extra_headers=None: FakeAsyncClient( # type: ignore[method-assign] calls, responses, api_key, extra_headers or {}, ) credential = client.user_credential("tom-key", "tom") result = asyncio.run(client.commit_session(credential, "sess-1")) assert result == { "status": "ok", "result": { "status": "accepted", "task_id": "task-1", "archive_uri": "viking://session/tom/sess-1/history/archive_001", }, } assert client.store.tasks == [("tom", "sess-1", "task-1", "viking://session/tom/sess-1/history/archive_001")] assert calls == [ ( "post", "tom-key", {}, "/api/v1/sessions/sess-1/commit", {"keep_recent_count": 0}, ) ] def test_everos_assistant_payload_does_not_use_user_id_as_sender(): client = EverOSMemorySystemClient() payload = client.build_message_payload( user_id="tom", session_id="sess-1", role="assistant", content="我记住了", ) message = payload["messages"][0] assert message["role"] == "assistant" assert message["sender_id"] != "tom" assert message["sender_name"] != "tom" def test_everos_user_payload_uses_user_id_as_sender(): client = EverOSMemorySystemClient() payload = client.build_message_payload( user_id="tom", session_id="sess-1", role="user", content="我喜欢拿铁", ) message = payload["messages"][0] assert message["role"] == "user" assert message["sender_id"] == "tom" assert message["sender_name"] == "tom"