Files
steven_li 3b0af173cc refactor(beaver): 移除Hermes相关引用和迁移代码,完善Beaver后端主线实现
移除了所有Hermes相关的命名引用,包括:
- 从.gitignore中清理相关构建缓存文件
- 将README中的beaver-home路径配置更新
- 完善backend/README.md文档说明Beaver后端主线实现
- 移除Hermes风格的相关注释和兼容性代码
- 清理nanobot环境变量兼容性处理
- 删除技能迁移和服务迁移相关功能代码
- 更新测试用例中相关命名和函数名

BREAKING CHANGE: 移除了Hermes迁移相关API和CLI命令,不再支持nanobot环境变量兼容性
2026-05-14 17:20:32 +08:00

464 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Beaver 的精炼长期记忆存储层。
这个文件实现的是 Beaver curated memory 模型,目标不是
“把所有历史都存下来”,而是只保存跨会话仍然值得保留的稳定事实。
核心设计:
1. 只保留两个持久化记忆桶:
- ``memory``: agent 自己对环境、项目、工具 quirks 的长期备注
- ``user``: 对用户偏好、习惯、身份信息的长期理解
2. ``replace`` / ``remove`` 不使用 UUID而是使用短语义片段做子串匹配。
这是为了适配 LLM 更擅长“记住一句话片段”而不是“追踪一个随机 ID”的现实。
3. 写入前先做安全扫描,避免把 prompt injection / secrets exfiltration
一类危险内容写入长期记忆,再在未来会话中反向污染 system prompt。
4. 写入协议严格遵守:
- scan
- lock
- reload
- validate
- atomic write
5. 本文件维护两份状态:
- live state: 当前内存中的真实条目tool 写入后立刻变化
- frozen snapshot: 会话开始时冻结的一份 prompt 注入快照
其中最重要的一点是:本会话中新增的记忆会立刻写盘,但不会反向修改本会话
已经冻结的 system prompt。这样可以保住 prefix cache也避免“会话中途 prompt
变了导致行为抖动”的问题。
"""
from __future__ import annotations
import os
import re
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Any
try:
import fcntl
except ImportError: # pragma: no cover - Windows fallback
fcntl = None
try:
import msvcrt
except ImportError: # pragma: no cover - Unix platforms
msvcrt = None
ENTRY_DELIMITER = "\n§\n"
DEFAULT_MEMORY_FILENAME = "MEMORY.md"
DEFAULT_USER_FILENAME = "USER.md"
_MEMORY_THREAT_PATTERNS: list[tuple[str, str]] = [
(r"ignore\s+(previous|all|above|prior)\s+instructions", "prompt_injection"),
(r"you\s+are\s+now\s+", "role_hijack"),
(r"do\s+not\s+tell\s+the\s+user", "deception_hide"),
(r"system\s+prompt\s+override", "sys_prompt_override"),
(r"disregard\s+(your|all|any)\s+(instructions|rules|guidelines)", "disregard_rules"),
(r"act\s+as\s+(if|though)\s+you\s+(have\s+no|don't\s+have)\s+(restrictions|limits|rules)", "bypass_restrictions"),
(r"curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)", "exfil_curl"),
(r"wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)", "exfil_wget"),
(r"cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)", "read_secrets"),
(r"authorized_keys", "ssh_backdoor"),
(r"\$HOME/\.ssh|\~/\.ssh", "ssh_access"),
(r"\$HOME/\.beaver/\.env|\~/\.beaver/\.env", "beaver_env"),
]
_INVISIBLE_CHARS = {
"\u200b",
"\u200c",
"\u200d",
"\u2060",
"\ufeff",
"\u202a",
"\u202b",
"\u202c",
"\u202d",
"\u202e",
}
def scan_memory_content(content: str) -> str | None:
"""扫描待写入内容,拦截明显危险的记忆条目。
这里不是在做完备的安全审计,而是在做“进入长期记忆之前的最低限度闸门”。
因为长期记忆会在未来会话中重新注入 system prompt所以一旦把恶意文本写进去
风险远高于普通临时上下文。
"""
for char in _INVISIBLE_CHARS:
if char in content:
return (
f"Blocked: content contains invisible unicode character "
f"U+{ord(char):04X}."
)
for pattern, pattern_id in _MEMORY_THREAT_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
return (
f"Blocked: content matches threat pattern '{pattern_id}'. "
"Memory entries are injected into future system prompts."
)
return None
class MemoryStore:
"""带容量上限的长期记忆存储。
这个类负责:
1. 从磁盘加载 `MEMORY.md` / `USER.md`
2. 在 session 启动时冻结 prompt snapshot
3. 为 `add / replace / remove` 提供安全写接口
4. 维护 live state 与 frozen snapshot 的边界
它不负责:
1. 自动从对话里抽取要记住的内容
2. session transcript 检索
3. skills 的学习和发布
"""
def __init__(
self,
root: str | Path,
*,
memory_char_limit: int = 2200,
user_char_limit: int = 1375,
) -> None:
self.root = Path(root)
self.memory_char_limit = memory_char_limit
self.user_char_limit = user_char_limit
self.memory_entries: list[str] = []
self.user_entries: list[str] = []
self._system_prompt_snapshot: dict[str, str] = {"memory": "", "user": ""}
def load_from_disk(self) -> None:
"""从磁盘加载 live state并冻结当前 session 的 prompt snapshot。
调用时机应该是“会话启动时”,而不是每次工具写入后。
如果在每次写入后都重新 load 并更新 system prompt就会破坏 frozen snapshot
这个设计,导致本轮会话 prompt 前缀发生变化。
"""
self.root.mkdir(parents=True, exist_ok=True)
self.memory_entries = list(dict.fromkeys(self._read_file(self._path_for("memory"))))
self.user_entries = list(dict.fromkeys(self._read_file(self._path_for("user"))))
self._system_prompt_snapshot = {
"memory": self._render_block("memory", self.memory_entries),
"user": self._render_block("user", self.user_entries),
}
@contextmanager
def _file_lock(self, path: Path):
"""对目标记忆文件加排他锁。
锁文件使用 sibling `.lock` 文件,而不是直接锁业务文件本身。
原因是业务文件使用的是“临时文件写入 + os.replace 原子替换”,如果直接锁目标
文件,替换时会让锁语义和文件句柄关系变得更脆弱。
"""
lock_path = path.with_suffix(path.suffix + ".lock")
lock_path.parent.mkdir(parents=True, exist_ok=True)
if fcntl is None and msvcrt is None:
yield
return
if msvcrt and (not lock_path.exists() or lock_path.stat().st_size == 0):
lock_path.write_text(" ", encoding="utf-8")
fd = open(lock_path, "r+" if msvcrt else "a+", encoding="utf-8")
try:
if fcntl is not None:
fcntl.flock(fd, fcntl.LOCK_EX)
elif msvcrt is not None: # pragma: no cover - Windows fallback
fd.seek(0)
msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1)
yield
finally:
if fcntl is not None:
fcntl.flock(fd, fcntl.LOCK_UN)
elif msvcrt is not None: # pragma: no cover - Windows fallback
try:
fd.seek(0)
msvcrt.locking(fd.fileno(), msvcrt.LK_UNLCK, 1)
except OSError:
pass
fd.close()
def _path_for(self, target: str) -> Path:
"""根据目标桶返回实际文件路径。"""
if target == "user":
return self.root / DEFAULT_USER_FILENAME
return self.root / DEFAULT_MEMORY_FILENAME
def _entries_for(self, target: str) -> list[str]:
"""读取某个目标桶当前的 live entries。"""
if target == "user":
return self.user_entries
return self.memory_entries
def _set_entries(self, target: str, entries: list[str]) -> None:
"""更新某个目标桶在内存中的 live entries。"""
if target == "user":
self.user_entries = entries
else:
self.memory_entries = entries
def _char_limit(self, target: str) -> int:
"""返回目标桶的字符预算。
这里使用字符数而不是 token 数,是因为字符预算更稳定,也不依赖具体模型。
"""
return self.user_char_limit if target == "user" else self.memory_char_limit
def _char_count(self, target: str) -> int:
"""返回目标桶当前 live state 的字符占用。"""
entries = self._entries_for(target)
return len(ENTRY_DELIMITER.join(entries)) if entries else 0
def _reload_target(self, target: str) -> None:
"""在持锁状态下重新从磁盘读取目标桶。
这是并发安全协议里最关键的一步之一。
必须在拿到锁之后 reload才能确保当前进程不会覆盖掉其他并发会话刚刚写入
的最新内容。
"""
fresh = list(dict.fromkeys(self._read_file(self._path_for(target))))
self._set_entries(target, fresh)
def save_to_disk(self, target: str) -> None:
"""把当前 live entries 持久化到磁盘。"""
self.root.mkdir(parents=True, exist_ok=True)
self._write_file(self._path_for(target), self._entries_for(target))
def add(self, target: str, content: str) -> dict[str, Any]:
"""追加一条新的长期记忆。
规则:
1. 空内容拒绝
2. 安全扫描不通过拒绝
3. 精确重复拒绝
4. 超出字符预算拒绝
5. 否则追加并立即写盘
"""
content = content.strip()
if not content:
return {"success": False, "error": "Content cannot be empty."}
scan_error = scan_memory_content(content)
if scan_error:
return {"success": False, "error": scan_error}
with self._file_lock(self._path_for(target)):
self._reload_target(target)
entries = self._entries_for(target)
if content in entries:
return self._success_response(target, "Entry already exists (skipped duplicate).")
new_entries = entries + [content]
new_total = len(ENTRY_DELIMITER.join(new_entries))
limit = self._char_limit(target)
if new_total > limit:
current = self._char_count(target)
return {
"success": False,
"error": (
f"Memory at {current:,}/{limit:,} chars. "
f"Adding this entry ({len(content)} chars) would exceed the limit."
),
"current_entries": list(entries),
"usage": f"{current:,}/{limit:,}",
}
entries.append(content)
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry added.")
def replace(self, target: str, old_text: str, new_content: str) -> dict[str, Any]:
"""用新的内容替换一条已有记忆。
这里按 `old_text in entry` 做子串匹配,而不是要求调用方提供完整条目或 UUID。
如果命中多条且它们内容不同,会要求调用方给出更精确的片段,避免误替换。
"""
old_text = old_text.strip()
new_content = new_content.strip()
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
if not new_content:
return {
"success": False,
"error": "new_content cannot be empty. Use remove to delete entries.",
}
scan_error = scan_memory_content(new_content)
if scan_error:
return {"success": False, "error": scan_error}
with self._file_lock(self._path_for(target)):
self._reload_target(target)
entries = self._entries_for(target)
matches = [(index, entry) for index, entry in enumerate(entries) if old_text in entry]
if not matches:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
unique_texts = {entry for _, entry in matches}
if len(unique_texts) > 1:
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": [
entry[:80] + ("..." if len(entry) > 80 else "")
for _, entry in matches
],
}
index = matches[0][0]
candidate_entries = list(entries)
candidate_entries[index] = new_content
new_total = len(ENTRY_DELIMITER.join(candidate_entries))
limit = self._char_limit(target)
if new_total > limit:
return {
"success": False,
"error": (
f"Replacement would put memory at {new_total:,}/{limit:,} chars. "
"Shorten the new content or remove other entries first."
),
}
entries[index] = new_content
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry replaced.")
def remove(self, target: str, old_text: str) -> dict[str, Any]:
"""删除一条已有记忆。
删除和替换共享同样的匹配策略:优先服务于 LLM 可操作性,而不是数据库式的强 ID。
"""
old_text = old_text.strip()
if not old_text:
return {"success": False, "error": "old_text cannot be empty."}
with self._file_lock(self._path_for(target)):
self._reload_target(target)
entries = self._entries_for(target)
matches = [(index, entry) for index, entry in enumerate(entries) if old_text in entry]
if not matches:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
unique_texts = {entry for _, entry in matches}
if len(unique_texts) > 1:
return {
"success": False,
"error": f"Multiple entries matched '{old_text}'. Be more specific.",
"matches": [
entry[:80] + ("..." if len(entry) > 80 else "")
for _, entry in matches
],
}
entries.pop(matches[0][0])
self._set_entries(target, entries)
self.save_to_disk(target)
return self._success_response(target, "Entry removed.")
def format_for_system_prompt(self, target: str) -> str | None:
"""返回 session 启动时冻结下来的 prompt block。
这里明确返回的是 frozen snapshot而不是 live state。
所以如果 session 中途调用 `add()` 写入了新记忆,这里不会立刻变化。
"""
block = self._system_prompt_snapshot.get(target, "")
return block or None
def _success_response(self, target: str, message: str | None = None) -> dict[str, Any]:
"""统一生成 memory tool 的成功响应。
响应里返回 live entries 和占用信息,目的是让模型能“看到自己刚写进去什么”,
即使 system prompt 仍然保持冻结不变。
"""
current = self._char_count(target)
limit = self._char_limit(target)
percent = min(100, int((current / limit) * 100)) if limit > 0 else 0
payload: dict[str, Any] = {
"success": True,
"target": target,
"entries": list(self._entries_for(target)),
"entry_count": len(self._entries_for(target)),
"usage": f"{percent}% — {current:,}/{limit:,} chars",
}
if message:
payload["message"] = message
return payload
def _render_block(self, target: str, entries: list[str]) -> str:
"""把条目渲染成适合注入 system prompt 的块。"""
if not entries:
return ""
current = len(ENTRY_DELIMITER.join(entries))
limit = self._char_limit(target)
percent = min(100, int((current / limit) * 100)) if limit > 0 else 0
if target == "user":
header = f"USER PROFILE (who the user is) [{percent}% — {current:,}/{limit:,} chars]"
else:
header = f"MEMORY (your personal notes) [{percent}% — {current:,}/{limit:,} chars]"
separator = "" * 46
return f"{separator}\n{header}\n{separator}\n{ENTRY_DELIMITER.join(entries)}"
@staticmethod
def _read_file(path: Path) -> list[str]:
"""读取记忆文件并按 entry delimiter 拆分。
这里不额外加读锁,因为写入采用的是原子替换:读者只会看到旧完整文件或新完整文件,
不会看到半写入状态。
"""
if not path.exists():
return []
try:
raw = path.read_text(encoding="utf-8")
except OSError:
return []
if not raw.strip():
return []
return [entry for entry in (item.strip() for item in raw.split(ENTRY_DELIMITER)) if entry]
@staticmethod
def _write_file(path: Path, entries: list[str]) -> None:
"""以原子方式写入记忆文件。
这里不能直接 `open(path, "w")`,因为那会先截断原文件,再写新内容。
如果恰好此时别的进程正在读,就可能读到空文件或半成品。
正确方式是:
1. 在同目录创建临时文件
2. 写入并 fsync
3. 使用 `os.replace()` 原子替换
"""
content = ENTRY_DELIMITER.join(entries) if entries else ""
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), suffix=".tmp", prefix=".mem_")
try:
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(content)
handle.flush()
os.fsync(handle.fileno())
os.replace(tmp_path, path)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise