Files
memory-gateway/plugins/memory-gateway-agent/memory_gateway_plugin/safety.py

98 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
from typing import Any
SECRET_PATTERNS = [
r"\bpassword\s*[:=]",
r"\bapi[_-]?key\s*[:=]",
r"\btoken\s*[:=]",
r"\bsecret\s*[:=]",
r"\bbearer\s+[a-z0-9._\-]{12,}",
r"\bcookie\s*[:=]",
r"\bsession[_ -]?id\s*[:=]",
r"-----BEGIN [A-Z ]*PRIVATE KEY-----",
r"\bssh-rsa\s+[a-z0-9+/=]{40,}",
r"\bone[- ]?time (?:password|code)\b",
r"\botp\s*[:=]?\s*\d{4,8}\b",
r"\b验证码\s*[:]?\s*\d{4,8}\b",
]
CHAT_LINE_RE = re.compile(r"^\s*(user|assistant|system|用户|助手|模型|human|ai)\s*[:]", re.I)
LOG_LINE_RE = re.compile(r"\b(ERROR|WARN|INFO|DEBUG|TRACE)\b|^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}")
CHAIN_OF_THOUGHT_RE = re.compile(r"chain[- ]of[- ]thought|逐步推理|隐藏推理|internal reasoning", re.I)
def detect_secret(content: str) -> tuple[bool, str]:
for pattern in SECRET_PATTERNS:
if re.search(pattern, content, re.I):
return True, "secret_like_content"
return False, ""
def detect_raw_transcript(content: str) -> tuple[bool, str]:
lines = [line for line in content.splitlines() if line.strip()]
chat_lines = sum(1 for line in lines if CHAT_LINE_RE.search(line))
if chat_lines >= 4:
return True, "raw_chat_transcript"
if "完整原始对话" in content or "full transcript" in content.lower():
return True, "raw_chat_transcript"
return False, ""
def detect_large_log(content: str) -> tuple[bool, str]:
lines = [line for line in content.splitlines() if line.strip()]
log_lines = sum(1 for line in lines if LOG_LINE_RE.search(line))
if len(content) > 4000 or len(lines) > 40 or log_lines >= 8:
return True, "large_or_raw_log"
return False, ""
def detect_low_value_memory(content: str) -> tuple[bool, str]:
normalized = re.sub(r"\s+", " ", content).strip().lower()
stable_signal = re.search(r"记住|偏好|长期|决策|结论|约束|preference|remember|decision|constraint", normalized, re.I)
if stable_signal:
return False, ""
if len(normalized) < 12:
return True, "too_short"
small_talk = {
"hi",
"hello",
"thanks",
"thank you",
"ok",
"好的",
"谢谢",
"你好",
"收到",
"再见",
}
if normalized in small_talk:
return True, "small_talk"
return False, ""
def sanitize_memory_content(content: str) -> str:
sanitized = content.strip()
sanitized = re.sub(r"\b(password|api[_-]?key|token|secret)\s*[:=]\s*\S+", r"\1=<redacted>", sanitized, flags=re.I)
sanitized = re.sub(r"\bbearer\s+[a-z0-9._\-]{12,}", "Bearer <redacted>", sanitized, flags=re.I)
sanitized = re.sub(r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----", "<redacted-private-key>", sanitized, flags=re.I | re.S)
return sanitized
def validate_memory_write(content: str, *, allow_low_value: bool = False) -> dict[str, Any]:
if not content or not content.strip():
return {"allowed": False, "reason": "empty_content", "sanitized_content": ""}
checks = [detect_secret, detect_raw_transcript, detect_large_log]
for check in checks:
blocked, reason = check(content)
if blocked:
return {"allowed": False, "reason": reason, "sanitized_content": ""}
if CHAIN_OF_THOUGHT_RE.search(content):
return {"allowed": False, "reason": "chain_of_thought", "sanitized_content": ""}
low_value, reason = detect_low_value_memory(content)
if low_value and not allow_low_value:
return {"allowed": False, "reason": reason, "sanitized_content": ""}
return {"allowed": True, "reason": "ok", "sanitized_content": sanitize_memory_content(content)}