"""Beaver memory MCP server. 这个 server 用最精简的方式把两个内部能力暴露成 streamable-http MCP tools: 1. `memory` 2. `session_search` 运行方式: 1. 直接用 Python: `python -m beaver.interfaces.mcp.memory_server --host 127.0.0.1 --port 8001` 2. 或者用 FastMCP CLI: `fastmcp run beaver/interfaces/mcp/memory_server.py:mcp --transport http --port 8001` 默认 MCP 路径是 `/mcp`,FastMCP 的 HTTP transport 就是 streamable HTTP。 """ from __future__ import annotations import argparse import json import os from pathlib import Path from typing import Any from beaver.engine.session import SessionManager from beaver.memory.curated.store import MemoryStore from beaver.tools.builtins.memory import memory_tool from beaver.tools.builtins.session_search import session_search as run_session_search try: # pragma: no cover - import guard for environments without fastmcp from fastmcp import Context, FastMCP from fastmcp.server.lifespan import lifespan except ModuleNotFoundError: # pragma: no cover - handled at runtime in main() FastMCP = None # type: ignore[assignment] Context = Any # type: ignore[assignment] lifespan = None # type: ignore[assignment] def _require_fastmcp() -> None: if FastMCP is None or lifespan is None: raise RuntimeError( "fastmcp is not installed. Install it with `pip install fastmcp` " "or via this project's dependencies." ) def _resolve_workspace_path(workspace: str | Path | None = None) -> Path: """决定 memory server 使用的 workspace 根目录。""" if workspace is not None: return Path(workspace).expanduser().resolve() env_workspace = os.getenv("BEAVER_WORKSPACE") if env_workspace: return Path(env_workspace).expanduser().resolve() return Path.cwd() def _resolve_memory_dir(workspace: Path) -> Path: """curated memory 的默认目录。""" return workspace / "memory" / "curated" def _resolve_session_db_path(workspace: Path) -> Path: """session store 的默认路径。""" return workspace / "sessions" / "state.db" def create_memory_server( *, workspace: str | Path | None = None, memory_dir: str | Path | None = None, session_db_path: str | Path | None = None, ): """创建并返回 FastMCP memory server 实例。""" _require_fastmcp() workspace_path = _resolve_workspace_path(workspace) resolved_memory_dir = Path(memory_dir).expanduser().resolve() if memory_dir else _resolve_memory_dir(workspace_path) resolved_session_db = ( Path(session_db_path).expanduser().resolve() if session_db_path else _resolve_session_db_path(workspace_path) ) @lifespan async def memory_server_lifespan(_server): """在 server 生命周期内初始化共享 store/db。""" store = MemoryStore(resolved_memory_dir) store.load_from_disk() session_manager = SessionManager(workspace=workspace_path, db_path=resolved_session_db) try: yield { "workspace_path": workspace_path, "memory_dir": resolved_memory_dir, "session_db_path": resolved_session_db, "memory_store": store, "session_manager": session_manager, } finally: session_manager.close() server = FastMCP( name="Beaver Memory Server", instructions=( "Provides two MCP tools: `memory` for durable curated memory CRUD, " "and `session_search` for cross-session recall from transcript storage." ), lifespan=memory_server_lifespan, ) @server.custom_route("/health", methods=["GET"]) async def health_check(_request): """最小 health check,方便远程探活。""" from starlette.responses import JSONResponse return JSONResponse( { "ok": True, "server": "beaver-memory", "transport": "streamable-http", "workspace": str(workspace_path), "memory_dir": str(resolved_memory_dir), "session_db_path": str(resolved_session_db), } ) @server.tool() async def memory( action: str, target: str = "memory", content: str | None = None, old_text: str | None = None, ctx: Context | None = None, ) -> dict[str, Any]: """CRUD for curated memory.""" if ctx is None: raise RuntimeError("FastMCP context is required.") raw_result = memory_tool( action=action, target=target, content=content, old_text=old_text, store=ctx.lifespan_context["memory_store"], ) return json.loads(raw_result) @server.tool() async def session_search( query: str = "", role_filter: str | None = None, limit: int = 3, ctx: Context | None = None, ) -> dict[str, Any]: """Search prior sessions or browse recent ones.""" if ctx is None: raise RuntimeError("FastMCP context is required.") raw_result = await run_session_search( query=query, role_filter=role_filter, limit=limit, db=ctx.lifespan_context["session_manager"], current_session_id=getattr(ctx, "session_id", None), ) return json.loads(raw_result) return server def build_arg_parser() -> argparse.ArgumentParser: """构建最小命令行参数解析器。""" parser = argparse.ArgumentParser(description="Run Beaver memory MCP server over streamable HTTP.") parser.add_argument("--workspace", default=None, help="Workspace root. Defaults to BEAVER_WORKSPACE or cwd.") parser.add_argument("--memory-dir", default=None, help="Override curated memory directory.") parser.add_argument("--session-db", default=None, help="Override session SQLite database path.") parser.add_argument("--host", default="127.0.0.1", help="HTTP bind host.") parser.add_argument("--port", default=8001, type=int, help="HTTP bind port.") parser.add_argument("--path", default="/mcp", help="MCP endpoint path.") return parser def main() -> None: """以 streamable HTTP 启动 memory server。""" parser = build_arg_parser() args = parser.parse_args() server = create_memory_server( workspace=args.workspace, memory_dir=args.memory_dir, session_db_path=args.session_db, ) server.run( transport="http", host=args.host, port=args.port, path=args.path, ) if FastMCP is not None: mcp = create_memory_server() if __name__ == "__main__": main()