Files
beaver_project/app-instance/backend/beaver/interfaces/mcp/memory_server.py

211 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Beaver memory MCP server.
这个 server 用最精简的方式把两个内部能力暴露成 streamable-http MCP tools
1. `memory`
2. `session_search`
运行方式:
1. 直接用 Python
`python -m beaver.interfaces.mcp.memory_server --host 127.0.0.1 --port 8001`
2. 或者用 FastMCP CLI
`fastmcp run beaver/interfaces/mcp/memory_server.py:mcp --transport http --port 8001`
默认 MCP 路径是 `/mcp`FastMCP 的 HTTP transport 就是 streamable HTTP。
"""
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
from typing import Any
from beaver.engine.session import SessionManager
from beaver.memory.curated.store import MemoryStore
from beaver.tools.builtins.memory import memory_tool
from beaver.tools.builtins.session_search import session_search as run_session_search
try: # pragma: no cover - import guard for environments without fastmcp
from fastmcp import Context, FastMCP
from fastmcp.server.lifespan import lifespan
except ModuleNotFoundError: # pragma: no cover - handled at runtime in main()
FastMCP = None # type: ignore[assignment]
Context = Any # type: ignore[assignment]
lifespan = None # type: ignore[assignment]
def _require_fastmcp() -> None:
if FastMCP is None or lifespan is None:
raise RuntimeError(
"fastmcp is not installed. Install it with `pip install fastmcp` "
"or via this project's dependencies."
)
def _resolve_workspace_path(workspace: str | Path | None = None) -> Path:
"""决定 memory server 使用的 workspace 根目录。"""
if workspace is not None:
return Path(workspace).expanduser().resolve()
env_workspace = os.getenv("BEAVER_WORKSPACE")
if env_workspace:
return Path(env_workspace).expanduser().resolve()
return Path.cwd()
def _resolve_memory_dir(workspace: Path) -> Path:
"""curated memory 的默认目录。"""
return workspace / "memory" / "curated"
def _resolve_session_db_path(workspace: Path) -> Path:
"""session store 的默认路径。"""
return workspace / "sessions" / "state.db"
def create_memory_server(
*,
workspace: str | Path | None = None,
memory_dir: str | Path | None = None,
session_db_path: str | Path | None = None,
):
"""创建并返回 FastMCP memory server 实例。"""
_require_fastmcp()
workspace_path = _resolve_workspace_path(workspace)
resolved_memory_dir = Path(memory_dir).expanduser().resolve() if memory_dir else _resolve_memory_dir(workspace_path)
resolved_session_db = (
Path(session_db_path).expanduser().resolve()
if session_db_path
else _resolve_session_db_path(workspace_path)
)
@lifespan
async def memory_server_lifespan(_server):
"""在 server 生命周期内初始化共享 store/db。"""
store = MemoryStore(resolved_memory_dir)
store.load_from_disk()
session_manager = SessionManager(workspace=workspace_path, db_path=resolved_session_db)
try:
yield {
"workspace_path": workspace_path,
"memory_dir": resolved_memory_dir,
"session_db_path": resolved_session_db,
"memory_store": store,
"session_manager": session_manager,
}
finally:
session_manager.close()
server = FastMCP(
name="Beaver Memory Server",
instructions=(
"Provides two MCP tools: `memory` for durable curated memory CRUD, "
"and `session_search` for cross-session recall from transcript storage."
),
lifespan=memory_server_lifespan,
)
@server.custom_route("/health", methods=["GET"])
async def health_check(_request):
"""最小 health check方便远程探活。"""
from starlette.responses import JSONResponse
return JSONResponse(
{
"ok": True,
"server": "beaver-memory",
"transport": "streamable-http",
"workspace": str(workspace_path),
"memory_dir": str(resolved_memory_dir),
"session_db_path": str(resolved_session_db),
}
)
@server.tool()
async def memory(
action: str,
target: str = "memory",
content: str | None = None,
old_text: str | None = None,
ctx: Context | None = None,
) -> dict[str, Any]:
"""CRUD for curated memory."""
if ctx is None:
raise RuntimeError("FastMCP context is required.")
raw_result = memory_tool(
action=action,
target=target,
content=content,
old_text=old_text,
store=ctx.lifespan_context["memory_store"],
)
return json.loads(raw_result)
@server.tool()
async def session_search(
query: str = "",
role_filter: str | None = None,
limit: int = 3,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Search prior sessions or browse recent ones."""
if ctx is None:
raise RuntimeError("FastMCP context is required.")
raw_result = await run_session_search(
query=query,
role_filter=role_filter,
limit=limit,
db=ctx.lifespan_context["session_manager"],
current_session_id=getattr(ctx, "session_id", None),
)
return json.loads(raw_result)
return server
def build_arg_parser() -> argparse.ArgumentParser:
"""构建最小命令行参数解析器。"""
parser = argparse.ArgumentParser(description="Run Beaver memory MCP server over streamable HTTP.")
parser.add_argument("--workspace", default=None, help="Workspace root. Defaults to BEAVER_WORKSPACE or cwd.")
parser.add_argument("--memory-dir", default=None, help="Override curated memory directory.")
parser.add_argument("--session-db", default=None, help="Override session SQLite database path.")
parser.add_argument("--host", default="127.0.0.1", help="HTTP bind host.")
parser.add_argument("--port", default=8001, type=int, help="HTTP bind port.")
parser.add_argument("--path", default="/mcp", help="MCP endpoint path.")
return parser
def main() -> None:
"""以 streamable HTTP 启动 memory server。"""
parser = build_arg_parser()
args = parser.parse_args()
server = create_memory_server(
workspace=args.workspace,
memory_dir=args.memory_dir,
session_db_path=args.session_db,
)
server.run(
transport="http",
host=args.host,
port=args.port,
path=args.path,
)
if FastMCP is not None:
mcp = create_memory_server()
if __name__ == "__main__":
main()