98 lines
3.7 KiB
Python
98 lines
3.7 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from typing import Any
|
||
|
||
|
||
SECRET_PATTERNS = [
|
||
r"\bpassword\s*[:=]",
|
||
r"\bapi[_-]?key\s*[:=]",
|
||
r"\btoken\s*[:=]",
|
||
r"\bsecret\s*[:=]",
|
||
r"\bbearer\s+[a-z0-9._\-]{12,}",
|
||
r"\bcookie\s*[:=]",
|
||
r"\bsession[_ -]?id\s*[:=]",
|
||
r"-----BEGIN [A-Z ]*PRIVATE KEY-----",
|
||
r"\bssh-rsa\s+[a-z0-9+/=]{40,}",
|
||
r"\bone[- ]?time (?:password|code)\b",
|
||
r"\botp\s*[:=]?\s*\d{4,8}\b",
|
||
r"\b验证码\s*[::]?\s*\d{4,8}\b",
|
||
]
|
||
|
||
CHAT_LINE_RE = re.compile(r"^\s*(user|assistant|system|用户|助手|模型|human|ai)\s*[::]", re.I)
|
||
LOG_LINE_RE = re.compile(r"\b(ERROR|WARN|INFO|DEBUG|TRACE)\b|^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}")
|
||
CHAIN_OF_THOUGHT_RE = re.compile(r"chain[- ]of[- ]thought|逐步推理|隐藏推理|internal reasoning", re.I)
|
||
|
||
|
||
def detect_secret(content: str) -> tuple[bool, str]:
|
||
for pattern in SECRET_PATTERNS:
|
||
if re.search(pattern, content, re.I):
|
||
return True, "secret_like_content"
|
||
return False, ""
|
||
|
||
|
||
def detect_raw_transcript(content: str) -> tuple[bool, str]:
|
||
lines = [line for line in content.splitlines() if line.strip()]
|
||
chat_lines = sum(1 for line in lines if CHAT_LINE_RE.search(line))
|
||
if chat_lines >= 4:
|
||
return True, "raw_chat_transcript"
|
||
if "完整原始对话" in content or "full transcript" in content.lower():
|
||
return True, "raw_chat_transcript"
|
||
return False, ""
|
||
|
||
|
||
def detect_large_log(content: str) -> tuple[bool, str]:
|
||
lines = [line for line in content.splitlines() if line.strip()]
|
||
log_lines = sum(1 for line in lines if LOG_LINE_RE.search(line))
|
||
if len(content) > 4000 or len(lines) > 40 or log_lines >= 8:
|
||
return True, "large_or_raw_log"
|
||
return False, ""
|
||
|
||
|
||
def detect_low_value_memory(content: str) -> tuple[bool, str]:
|
||
normalized = re.sub(r"\s+", " ", content).strip().lower()
|
||
stable_signal = re.search(r"记住|偏好|长期|决策|结论|约束|preference|remember|decision|constraint", normalized, re.I)
|
||
if stable_signal:
|
||
return False, ""
|
||
if len(normalized) < 12:
|
||
return True, "too_short"
|
||
small_talk = {
|
||
"hi",
|
||
"hello",
|
||
"thanks",
|
||
"thank you",
|
||
"ok",
|
||
"好的",
|
||
"谢谢",
|
||
"你好",
|
||
"收到",
|
||
"再见",
|
||
}
|
||
if normalized in small_talk:
|
||
return True, "small_talk"
|
||
return False, ""
|
||
|
||
|
||
def sanitize_memory_content(content: str) -> str:
|
||
sanitized = content.strip()
|
||
sanitized = re.sub(r"\b(password|api[_-]?key|token|secret)\s*[:=]\s*\S+", r"\1=<redacted>", sanitized, flags=re.I)
|
||
sanitized = re.sub(r"\bbearer\s+[a-z0-9._\-]{12,}", "Bearer <redacted>", sanitized, flags=re.I)
|
||
sanitized = re.sub(r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----", "<redacted-private-key>", sanitized, flags=re.I | re.S)
|
||
return sanitized
|
||
|
||
|
||
def validate_memory_write(content: str, *, allow_low_value: bool = False) -> dict[str, Any]:
|
||
if not content or not content.strip():
|
||
return {"allowed": False, "reason": "empty_content", "sanitized_content": ""}
|
||
checks = [detect_secret, detect_raw_transcript, detect_large_log]
|
||
for check in checks:
|
||
blocked, reason = check(content)
|
||
if blocked:
|
||
return {"allowed": False, "reason": reason, "sanitized_content": ""}
|
||
if CHAIN_OF_THOUGHT_RE.search(content):
|
||
return {"allowed": False, "reason": "chain_of_thought", "sanitized_content": ""}
|
||
low_value, reason = detect_low_value_memory(content)
|
||
if low_value and not allow_low_value:
|
||
return {"allowed": False, "reason": reason, "sanitized_content": ""}
|
||
return {"allowed": True, "reason": "ok", "sanitized_content": sanitize_memory_content(content)}
|