添加 Memory Gateway Agent skill及其 CLI 实现,支持用户资源管理和记忆操作

This commit is contained in:
2026-06-12 11:49:04 +08:00
parent a29009dc07
commit 126ae4eafa
7 changed files with 1023 additions and 0 deletions

View File

@ -593,6 +593,25 @@ Gateway 内部通过 `core/everos_client.py` 调用 EverOS
- `search_memory(payload)` -> `POST /api/v1/memory/search`
- `health_check()` -> `GET /health`
## AI Agent Skill
项目提供可供 AI Agent 使用的 Skill
```text
skill/memory-gateway-agent
```
其中 `SKILL.md` 定义 Agent 工作流,`scripts/memory_gateway.py` 提供无额外依赖的命令行客户端,`references/api.md` 提供完整参数说明。使用前设置:
```bash
export MEMORY_GATEWAY_BASE_URL=http://127.0.0.1:8010
export MEMORY_GATEWAY_USER_ID=u_123
export MEMORY_GATEWAY_USER_KEY=uk_xxx
python skill/memory-gateway-agent/scripts/memory_gateway.py health
python skill/memory-gateway-agent/scripts/memory_gateway.py list-resources
```
## 运行测试
```bash

View File

@ -0,0 +1,45 @@
# Memory Gateway Agent Skill Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Create a reusable AI-agent skill that safely operates the Memory Gateway API through a deterministic Python CLI.
**Architecture:** Keep procedural guidance in `SKILL.md`, detailed endpoint schemas in `references/api.md`, and all HTTP/multipart behavior in one standard-library CLI. Read credentials from environment variables or explicit flags and never persist secrets in the skill.
**Tech Stack:** Agent Skills format, Python 3 standard library, pytest, Memory Gateway HTTP API.
---
### Task 1: Scaffold the skill
**Files:**
- Create: `skill/memory-gateway-agent/SKILL.md`
- Create: `skill/memory-gateway-agent/agents/openai.yaml`
- Create: `skill/memory-gateway-agent/scripts/memory_gateway.py`
- Create: `skill/memory-gateway-agent/references/api.md`
- [x] Initialize the standard skill structure with `init_skill.py`.
- [x] Remove generated placeholders and keep only required resources.
### Task 2: Implement and test the CLI
**Files:**
- Create: `tests/test_memory_gateway_skill.py`
- Modify: `skill/memory-gateway-agent/scripts/memory_gateway.py`
- [x] Write failing tests for environment credentials, JSON requests, multipart uploads, and HTTP errors.
- [x] Run the focused tests and confirm they fail for missing implementation.
- [x] Implement the standard-library CLI with commands for health, users, resources, search, add/flush, override, and delete.
- [x] Run the focused tests and confirm they pass.
### Task 3: Author and validate the skill
**Files:**
- Modify: `skill/memory-gateway-agent/SKILL.md`
- Modify: `skill/memory-gateway-agent/references/api.md`
- Modify: `skill/memory-gateway-agent/agents/openai.yaml`
- [x] Document the agent workflow, authentication rules, ownership checks, and safe handling of secrets.
- [x] Document endpoint parameters and CLI examples in the API reference.
- [x] Generate UI metadata with the official skill-creator script.
- [x] Run `quick_validate.py`, CLI `--help`, focused tests, and the full project test suite.

View File

@ -0,0 +1,64 @@
---
name: memory-gateway-agent
description: Use when an AI agent needs to create or authenticate a Memory Gateway user, upload and manage user resources, add or flush chat memories, search scoped memory, or apply user-approved memory corrections and deletions through the Memory Gateway HTTP API.
---
# Memory Gateway Agent
Use the bundled CLI instead of constructing HTTP requests manually. It produces JSON output and uses only the Python standard library.
## Configure
Set these variables before authenticated operations:
```bash
export MEMORY_GATEWAY_BASE_URL="http://127.0.0.1:8010"
export MEMORY_GATEWAY_USER_ID="u_123"
export MEMORY_GATEWAY_USER_KEY="uk_xxx"
SKILL_DIR="/path/to/memory-gateway-agent"
CLI="python $SKILL_DIR/scripts/memory_gateway.py"
```
Do not write a real `user_key` into source files, prompts, logs, or committed documentation. Command-line flags may override environment variables, but environment variables are preferred because process arguments may be observable.
## Workflow
1. Run `$CLI health` when connectivity or EverOS availability is uncertain.
2. Use an existing `user_id` and `user_key`. Run `create-user` only when the user explicitly needs a new Gateway identity.
3. Choose the operation:
- Upload durable user files with `upload-resource`.
- Add conversational or multimodal messages with `add-memory`, then call `flush-memory`.
- Search with the narrowest useful scope.
- List or inspect resources before deleting them.
4. Treat JSON output as the source of truth. Preserve returned `resource_id`, memory `id`, and `session_id` exactly.
5. For override or deletion, use the memory `id` and `session_id` returned by search. Never invent IDs or apply changes across users.
## Common Commands
```bash
$CLI health
$CLI list-resources
$CLI upload-resource ./contract.pdf --title "Contract"
$CLI search "What are the payment terms?" --scope resources --top-k 8
$CLI search "What did we discuss?" --scope current_chat --conversation-id c_456
$CLI override-memory mem_abc --session-id resource:u_123:r_xxx --text "Corrected text"
$CLI delete-memory mem_abc --session-id resource:u_123:r_xxx --reason "User requested deletion"
```
For chat ingestion, put the Gateway `messages` array in a JSON file:
```bash
$CLI add-memory --session-id chat:c_456 --messages /tmp/messages.json
$CLI flush-memory --session-id chat:c_456
```
Read [references/api.md](references/api.md) when choosing scopes, constructing multimodal messages, interpreting errors, or using less common commands.
## Safety Rules
- Do not expose internal file paths. Return the Gateway's `resource://{user_id}/{resource_id}` URI to users.
- Do not claim ingestion succeeded unless the upload status is `extracted` or flush reports success.
- Treat `health.status = degraded` as Gateway available but EverOS unavailable.
- Resource deletion is soft deletion in Gateway search scope and removes the Gateway upload copy; it does not delete EverOS internal indexes.
- Memory override and deletion require an owned `resource:{user_id}:{resource_id}` or `memory_edit:{user_id}` session.
- Ask for confirmation before destructive deletion unless the user's current request explicitly instructs deletion.

View File

@ -0,0 +1,4 @@
interface:
display_name: "Memory Gateway Agent"
short_description: "Operate Memory Gateway resources and user memories"
default_prompt: "Use $memory-gateway-agent to store, search, edit, or delete user memory through Memory Gateway."

View File

@ -0,0 +1,221 @@
# Memory Gateway API Reference
## Contents
- Configuration
- Session IDs
- CLI commands
- Message format
- Search scopes
- Errors and result handling
## Configuration
The CLI reads:
| Variable | Default | Purpose |
|---|---|---|
| `MEMORY_GATEWAY_BASE_URL` | `http://127.0.0.1:8010` | Gateway URL |
| `MEMORY_GATEWAY_USER_ID` | none | Authenticated user ID |
| `MEMORY_GATEWAY_USER_KEY` | none | Authenticated user key |
| `MEMORY_GATEWAY_TIMEOUT_SECONDS` | `120` | HTTP timeout |
Global CLI flags `--base-url`, `--user-id`, `--user-key`, and `--timeout` override these values. Put global flags before the subcommand.
## Session IDs
| Memory source | Format |
|---|---|
| Chat | `chat:{conversation_id}` |
| Uploaded resource | `resource:{user_id}:{resource_id}` |
| Manual correction | `memory_edit:{user_id}` |
Use the exact `session_id` returned by the API whenever possible.
## CLI Commands
Assume:
```bash
CLI="python skill/memory-gateway-agent/scripts/memory_gateway.py"
```
### Health
```bash
$CLI health
```
No credentials required. HTTP 200 may contain `"status": "degraded"` when EverOS is unavailable.
### Create User
```bash
$CLI create-user u_123
```
Returns a randomly generated `user_key`. Store it securely. Repeating the same `user_id` returns the existing key.
### Upload Resource
```bash
$CLI upload-resource ./document.pdf \
--app-id default \
--project-id default \
--title "Document title" \
--description "Optional description"
```
Requires credentials. Supported resources depend on the server MIME allowlist. Success returns:
```json
{
"resource_id": "r_xxx",
"session_id": "resource:u_123:r_xxx",
"uri": "resource://u_123/r_xxx",
"status": "extracted"
}
```
`failed` means the record exists but EverOS ingestion failed. Identical active content for the same user/app/project may return the existing resource.
### List, Get, and Delete Resources
```bash
$CLI list-resources
$CLI get-resource r_xxx
$CLI delete-resource r_xxx
```
Missing or foreign resource details return `{"resources": []}`. Delete excludes the resource from future `resources` searches.
### Search
```bash
$CLI search "payment terms" --scope resources --top-k 8
$CLI search "previous discussion" \
--scope current_chat \
--conversation-id c_456
$CLI search "known preferences" --scope all_user_memory
```
Repeat `--scope` to combine scopes:
```bash
$CLI search "query" --scope current_chat --scope resources \
--conversation-id c_456
```
When no scope is provided, the CLI searches `resources` only unless a conversation ID is supplied; with a conversation ID it searches `current_chat` and `resources`. Explicit `current_chat` scope requires `--conversation-id`.
Each result includes normalized `id`, `session_id`, `text`, `source_scope`, and optional resource metadata. The `raw` field preserves the EverOS response.
### Add and Flush Memory
Create `/tmp/messages.json` containing an array:
```json
[
{
"sender_id": "u_123",
"role": "user",
"timestamp": 1781172177000,
"content": [
{"type": "text", "text": "Remember this note"}
]
}
]
```
Then run:
```bash
$CLI add-memory --session-id chat:c_456 --messages /tmp/messages.json
$CLI flush-memory --session-id chat:c_456
```
`--messages` accepts either a JSON array string or a path to a JSON file. Always flush after all messages for the session have been added.
### Override and Delete Memory
Use IDs from a search result:
```bash
$CLI override-memory mem_abc \
--session-id resource:u_123:r_xxx \
--text "Corrected memory text"
$CLI delete-memory mem_abc \
--session-id resource:u_123:r_xxx \
--reason "User requested deletion"
```
These operations write Gateway overrides or tombstones. They do not modify EverOS files directly. The server rejects sessions not owned by the authenticated user.
## Message Format
Each message requires:
| Field | Type | Notes |
|---|---|---|
| `sender_id` | string | Usually the current user ID |
| `role` | string | `user`, `assistant`, or `tool` |
| `timestamp` | integer | Unix milliseconds, greater than zero |
| `content` | string or array | Text or EverOS content items |
Common content items:
```json
{"type": "text", "text": "Plain text"}
```
```json
{
"type": "image",
"base64": "BASE64_DATA",
"ext": "png",
"name": "image.png"
}
```
```json
{
"type": "audio",
"base64": "BASE64_DATA",
"ext": "wav",
"name": "audio.wav"
}
```
Prefer base64 for local binary files. A `file://` URI is only usable when EverOS can access the same filesystem path.
## Search Scopes
| Scope | Behavior |
|---|---|
| `current_chat` | Searches `chat:{conversation_id}`; provide `--conversation-id` |
| `resources` | Searches extracted, non-deleted resources belonging to the user/app/project |
| `all_user_memory` | Searches user memory without a session filter |
Use the narrowest scope that answers the request. This reduces unrelated results and prevents accidental cross-context use.
## Errors and Result Handling
The CLI exits nonzero and writes JSON to stderr:
```json
{"error": "Memory Gateway returned HTTP 401: invalid user credentials"}
```
Common statuses:
| Status | Meaning |
|---|---|
| `401` | Invalid `user_id` or `user_key` |
| `403` | Memory session is not owned by the user |
| `404` | Resource not found for deletion |
| `413` | Upload exceeds server size limit |
| `415` | MIME type is not allowed |
| `422` | Request fields are invalid or missing |
Do not retry `401`, `403`, `413`, `415`, or `422` without changing the request. Retry connectivity or transient server failures only when appropriate for the caller's workflow.

View File

@ -0,0 +1,466 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import mimetypes
import os
import sys
import uuid
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
class GatewayError(RuntimeError):
pass
class Settings:
def __init__(
self,
base_url: str,
user_id: str | None,
user_key: str | None,
timeout: float = 120.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.user_id = user_id
self.user_key = user_key
self.timeout = timeout
@classmethod
def from_env(cls) -> Settings:
return cls(
base_url=os.environ.get(
"MEMORY_GATEWAY_BASE_URL",
"http://127.0.0.1:8010",
),
user_id=os.environ.get("MEMORY_GATEWAY_USER_ID"),
user_key=os.environ.get("MEMORY_GATEWAY_USER_KEY"),
timeout=float(os.environ.get("MEMORY_GATEWAY_TIMEOUT_SECONDS", "120")),
)
class MemoryGatewayClient:
def __init__(
self,
base_url: str,
*,
user_id: str | None = None,
user_key: str | None = None,
timeout: float = 120.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.user_id = user_id
self.user_key = user_key
self.timeout = timeout
def _credentials(self) -> dict[str, str]:
if not self.user_id or not self.user_key:
raise GatewayError(
"user credentials are required; set MEMORY_GATEWAY_USER_ID and "
"MEMORY_GATEWAY_USER_KEY or pass --user-id and --user-key"
)
return {"user_id": self.user_id, "user_key": self.user_key}
def _request(
self,
method: str,
path: str,
*,
query: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
body: bytes | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
url = f"{self.base_url}{path}"
if query:
url = f"{url}?{urlencode(query, doseq=True)}"
request_headers = dict(headers or {})
request_body = body
if json_body is not None:
request_body = json.dumps(json_body, ensure_ascii=False).encode("utf-8")
request_headers["Content-Type"] = "application/json"
request = Request(
url,
data=request_body,
headers=request_headers,
method=method,
)
try:
with urlopen(request, timeout=self.timeout) as response:
raw = response.read()
except HTTPError as exc:
raw = exc.read()
detail = _error_detail(raw, exc.reason)
raise GatewayError(f"Memory Gateway returned HTTP {exc.code}: {detail}") from exc
except URLError as exc:
raise GatewayError(f"cannot connect to Memory Gateway: {exc.reason}") from exc
if not raw:
return {}
try:
value = json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise GatewayError("Memory Gateway returned a non-JSON response") from exc
if not isinstance(value, dict):
raise GatewayError("Memory Gateway returned an unexpected JSON response")
return value
def health(self) -> dict[str, Any]:
return self._request("GET", "/health")
def create_user(self, user_id: str) -> dict[str, Any]:
return self._request("POST", "/users", json_body={"user_id": user_id})
def upload_resource(
self,
file_path: Path,
*,
app_id: str = "default",
project_id: str = "default",
title: str | None = None,
description: str | None = None,
) -> dict[str, Any]:
if not file_path.is_file():
raise GatewayError(f"upload file does not exist: {file_path}")
fields: dict[str, str] = {
**self._credentials(),
"app_id": app_id,
"project_id": project_id,
}
if title is not None:
fields["title"] = title
if description is not None:
fields["description"] = description
boundary = f"memory-gateway-{uuid.uuid4().hex}"
mime_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
body = _multipart_body(
boundary,
fields,
field_name="file",
file_path=file_path,
mime_type=mime_type,
)
return self._request(
"POST",
"/resources",
body=body,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
)
def list_resources(self) -> dict[str, Any]:
return self._request("GET", "/resources", query=self._credentials())
def get_resource(self, resource_id: str) -> dict[str, Any]:
return self._request(
"GET",
f"/resources/{resource_id}",
query=self._credentials(),
)
def delete_resource(self, resource_id: str) -> dict[str, Any]:
return self._request(
"DELETE",
f"/resources/{resource_id}",
query=self._credentials(),
)
def search(
self,
query: str,
*,
conversation_id: str | None = None,
scopes: list[str] | None = None,
top_k: int = 8,
app_id: str = "default",
project_id: str = "default",
) -> dict[str, Any]:
selected_scopes = scopes or (
["current_chat", "resources"] if conversation_id else ["resources"]
)
if "current_chat" in selected_scopes and not conversation_id:
raise GatewayError(
"conversation_id is required when search scope includes current_chat"
)
payload: dict[str, Any] = {
**self._credentials(),
"query": query,
"scope": selected_scopes,
"top_k": top_k,
"app_id": app_id,
"project_id": project_id,
}
if conversation_id is not None:
payload["conversation_id"] = conversation_id
return self._request("POST", "/memories/search", json_body=payload)
def add_memory(
self,
session_id: str,
messages: list[dict[str, Any]],
*,
app_id: str = "default",
project_id: str = "default",
) -> dict[str, Any]:
return self._request(
"POST",
"/memories/add",
json_body={
**self._credentials(),
"session_id": session_id,
"messages": messages,
"app_id": app_id,
"project_id": project_id,
},
)
def flush_memory(
self,
session_id: str,
*,
app_id: str = "default",
project_id: str = "default",
) -> dict[str, Any]:
return self._request(
"POST",
"/memories/flush",
json_body={
**self._credentials(),
"session_id": session_id,
"app_id": app_id,
"project_id": project_id,
},
)
def override_memory(
self,
memory_id: str,
session_id: str,
override_text: str,
) -> dict[str, Any]:
return self._request(
"PATCH",
f"/memories/{memory_id}",
json_body={
**self._credentials(),
"session_id": session_id,
"override_text": override_text,
},
)
def delete_memory(
self,
memory_id: str,
session_id: str,
*,
reason: str | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
**self._credentials(),
"session_id": session_id,
}
if reason is not None:
payload["reason"] = reason
return self._request(
"DELETE",
f"/memories/{memory_id}",
json_body=payload,
)
def _error_detail(raw: bytes, fallback: Any) -> str:
try:
body = json.loads(raw.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return str(fallback)
if isinstance(body, dict) and body.get("detail"):
return str(body["detail"])
return str(fallback)
def _multipart_body(
boundary: str,
fields: dict[str, str],
*,
field_name: str,
file_path: Path,
mime_type: str,
) -> bytes:
marker = boundary.encode("ascii")
chunks: list[bytes] = []
for name, value in fields.items():
chunks.extend(
[
b"--" + marker + b"\r\n",
f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode(),
value.encode("utf-8"),
b"\r\n",
]
)
chunks.extend(
[
b"--" + marker + b"\r\n",
(
f'Content-Disposition: form-data; name="{field_name}"; '
f'filename="{file_path.name}"\r\n'
).encode(),
f"Content-Type: {mime_type}\r\n\r\n".encode(),
file_path.read_bytes(),
b"\r\n--" + marker + b"--\r\n",
]
)
return b"".join(chunks)
def _load_json_array(value: str) -> list[dict[str, Any]]:
source = Path(value)
text = source.read_text(encoding="utf-8") if source.is_file() else value
try:
parsed = json.loads(text)
except json.JSONDecodeError as exc:
raise GatewayError(f"invalid messages JSON: {exc}") from exc
if not isinstance(parsed, list) or not all(isinstance(item, dict) for item in parsed):
raise GatewayError("messages JSON must be an array of objects")
return parsed
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Memory Gateway agent CLI")
parser.add_argument("--base-url")
parser.add_argument("--user-id")
parser.add_argument("--user-key")
parser.add_argument("--timeout", type=float)
subparsers = parser.add_subparsers(dest="command", required=True)
subparsers.add_parser("health")
create_user = subparsers.add_parser("create-user")
create_user.add_argument("user_id")
upload = subparsers.add_parser("upload-resource")
upload.add_argument("file", type=Path)
_add_scope_arguments(upload)
upload.add_argument("--title")
upload.add_argument("--description")
subparsers.add_parser("list-resources")
get_resource = subparsers.add_parser("get-resource")
get_resource.add_argument("resource_id")
delete_resource = subparsers.add_parser("delete-resource")
delete_resource.add_argument("resource_id")
search = subparsers.add_parser("search")
search.add_argument("query")
search.add_argument("--conversation-id")
search.add_argument(
"--scope",
action="append",
choices=["current_chat", "resources", "all_user_memory"],
)
search.add_argument("--top-k", type=int, default=8)
_add_scope_arguments(search)
add = subparsers.add_parser("add-memory")
add.add_argument("--session-id", required=True)
add.add_argument(
"--messages",
required=True,
help="JSON array or path to a JSON file containing messages",
)
_add_scope_arguments(add)
flush = subparsers.add_parser("flush-memory")
flush.add_argument("--session-id", required=True)
_add_scope_arguments(flush)
override = subparsers.add_parser("override-memory")
override.add_argument("memory_id")
override.add_argument("--session-id", required=True)
override.add_argument("--text", required=True)
delete_memory = subparsers.add_parser("delete-memory")
delete_memory.add_argument("memory_id")
delete_memory.add_argument("--session-id", required=True)
delete_memory.add_argument("--reason")
return parser
def _add_scope_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--app-id", default="default")
parser.add_argument("--project-id", default="default")
def main(argv: list[str] | None = None) -> int:
settings = Settings.from_env()
args = build_parser().parse_args(argv)
client = MemoryGatewayClient(
args.base_url or settings.base_url,
user_id=args.user_id or settings.user_id,
user_key=args.user_key or settings.user_key,
timeout=args.timeout or settings.timeout,
)
try:
result = _run_command(client, args)
except GatewayError as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
return 1
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
def _run_command(client: MemoryGatewayClient, args: argparse.Namespace) -> dict[str, Any]:
if args.command == "health":
return client.health()
if args.command == "create-user":
return client.create_user(args.user_id)
if args.command == "upload-resource":
return client.upload_resource(
args.file,
app_id=args.app_id,
project_id=args.project_id,
title=args.title,
description=args.description,
)
if args.command == "list-resources":
return client.list_resources()
if args.command == "get-resource":
return client.get_resource(args.resource_id)
if args.command == "delete-resource":
return client.delete_resource(args.resource_id)
if args.command == "search":
return client.search(
args.query,
conversation_id=args.conversation_id,
scopes=args.scope,
top_k=args.top_k,
app_id=args.app_id,
project_id=args.project_id,
)
if args.command == "add-memory":
return client.add_memory(
args.session_id,
_load_json_array(args.messages),
app_id=args.app_id,
project_id=args.project_id,
)
if args.command == "flush-memory":
return client.flush_memory(
args.session_id,
app_id=args.app_id,
project_id=args.project_id,
)
if args.command == "override-memory":
return client.override_memory(args.memory_id, args.session_id, args.text)
if args.command == "delete-memory":
return client.delete_memory(
args.memory_id,
args.session_id,
reason=args.reason,
)
raise GatewayError(f"unsupported command: {args.command}")
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -0,0 +1,204 @@
from __future__ import annotations
import importlib.util
import json
from pathlib import Path
from urllib.error import HTTPError
import pytest
SCRIPT_PATH = (
Path(__file__).parents[1]
/ "skill"
/ "memory-gateway-agent"
/ "scripts"
/ "memory_gateway.py"
)
def load_cli():
spec = importlib.util.spec_from_file_location("memory_gateway_skill_cli", SCRIPT_PATH)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class FakeResponse:
def __init__(self, body: dict[str, object], status: int = 200) -> None:
self.body = json.dumps(body).encode()
self.status = status
def __enter__(self):
return self
def __exit__(self, *args: object) -> None:
return None
def read(self) -> bytes:
return self.body
def close(self) -> None:
return None
def test_settings_read_gateway_credentials_from_environment(
monkeypatch: pytest.MonkeyPatch,
) -> None:
cli = load_cli()
monkeypatch.setenv("MEMORY_GATEWAY_BASE_URL", "http://gateway.test/")
monkeypatch.setenv("MEMORY_GATEWAY_USER_ID", "u_agent")
monkeypatch.setenv("MEMORY_GATEWAY_USER_KEY", "uk_secret")
settings = cli.Settings.from_env()
assert settings.base_url == "http://gateway.test"
assert settings.user_id == "u_agent"
assert settings.user_key == "uk_secret"
def test_json_request_sends_authenticated_payload(
monkeypatch: pytest.MonkeyPatch,
) -> None:
cli = load_cli()
captured: dict[str, object] = {}
def fake_urlopen(request, timeout):
captured["url"] = request.full_url
captured["method"] = request.method
captured["body"] = json.loads(request.data)
captured["content_type"] = request.headers["Content-type"]
captured["timeout"] = timeout
return FakeResponse({"results": []})
monkeypatch.setattr(cli, "urlopen", fake_urlopen)
client = cli.MemoryGatewayClient(
"http://gateway.test",
user_id="u_agent",
user_key="uk_secret",
timeout=9,
)
result = client.search("contract", scopes=["resources"], top_k=5)
assert result == {"results": []}
assert captured == {
"url": "http://gateway.test/memories/search",
"method": "POST",
"body": {
"user_id": "u_agent",
"user_key": "uk_secret",
"query": "contract",
"scope": ["resources"],
"top_k": 5,
"app_id": "default",
"project_id": "default",
},
"content_type": "application/json",
"timeout": 9,
}
def test_upload_builds_multipart_request_without_exposing_file_uri(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
cli = load_cli()
upload = tmp_path / "note.txt"
upload.write_text("remember this", encoding="utf-8")
captured: dict[str, object] = {}
def fake_urlopen(request, timeout):
captured["url"] = request.full_url
captured["method"] = request.method
captured["body"] = request.data
captured["content_type"] = request.headers["Content-type"]
return FakeResponse(
{
"resource_id": "r_1",
"uri": "resource://u_agent/r_1",
"status": "extracted",
}
)
monkeypatch.setattr(cli, "urlopen", fake_urlopen)
client = cli.MemoryGatewayClient(
"http://gateway.test",
user_id="u_agent",
user_key="uk_secret",
)
result = client.upload_resource(upload, title="Agent note")
body = captured["body"]
assert isinstance(body, bytes)
assert captured["url"] == "http://gateway.test/resources"
assert captured["method"] == "POST"
assert str(captured["content_type"]).startswith("multipart/form-data; boundary=")
assert b'name="user_id"' in body
assert b"u_agent" in body
assert b'name="file"; filename="note.txt"' in body
assert b"remember this" in body
assert b"file://" not in body
assert result["uri"] == "resource://u_agent/r_1"
def test_http_error_raises_gateway_error_without_leaking_user_key(
monkeypatch: pytest.MonkeyPatch,
) -> None:
cli = load_cli()
def fake_urlopen(request, timeout):
raise HTTPError(
request.full_url,
401,
"Unauthorized",
hdrs=None,
fp=FakeResponse({"detail": "invalid user credentials"}),
)
monkeypatch.setattr(cli, "urlopen", fake_urlopen)
client = cli.MemoryGatewayClient(
"http://gateway.test",
user_id="u_agent",
user_key="uk_super_secret",
)
with pytest.raises(cli.GatewayError) as exc_info:
client.list_resources()
message = str(exc_info.value)
assert "401" in message
assert "invalid user credentials" in message
assert "uk_super_secret" not in message
def test_load_messages_accepts_large_inline_json() -> None:
cli = load_cli()
value = json.dumps(
[
{
"sender_id": "u_agent",
"role": "user",
"timestamp": 1234567890123,
"content": "x" * 5000,
}
]
)
messages = cli._load_json_array(value)
assert messages[0]["content"] == "x" * 5000
def test_search_requires_conversation_id_for_current_chat_scope() -> None:
cli = load_cli()
client = cli.MemoryGatewayClient(
"http://gateway.test",
user_id="u_agent",
user_key="uk_secret",
)
with pytest.raises(cli.GatewayError, match="conversation_id"):
client.search("what did we discuss", scopes=["current_chat"])