"""Policy-based DLP review derived from DLP_CATEGORIES in policy.py.""" from __future__ import annotations import re from collections import defaultdict from .models import ActionClass, AttachmentResult, DLPResult, RiskLevel, ViolationType from .policy import DLP_CATEGORIES # Keywords derived from DLP_CATEGORIES signal descriptions in policy.py _POLICY_KEYWORDS: dict[ViolationType, dict] = { ViolationType.PII: { "keywords": [ "full name", "email address", "social security", "ssn", "employee id", "phone number", "home address", "personal identifier", "date of birth", ], "min_matches": 2, "base_score": 55, }, ViolationType.FINANCIAL_DATA: { "keywords": [ "revenue", "ebitda", "projection", "forecast", "salary", "compensation plan", "invoice", "amount due", "payment terms", "budget", "gross margin", "sales data", ], "min_matches": 1, "base_score": 50, }, ViolationType.SOURCE_CODE: { "keywords": [ "copyright", "def ", "class ", "from __future__", "import ", "model weights", "api key", "api_key", "proprietary", "source code", "internal source", ], "min_matches": 2, "base_score": 85, }, ViolationType.REGULATORY_DOCUMENT: { "keywords": [ "cfpb", "gdpr", "sox", "compliance draft", "not for public release", "not for public distribution", "regulatory submission", "audit findings", "remediation plan", "internal compliance", ], "min_matches": 1, "base_score": 82, }, ViolationType.LEGAL_CONTRACT: { "keywords": [ "non-disclosure", "nondisclosure", "nda", "disclosing party", "receiving party", "confidentiality agreement", "settlement agreement", "executed contract", "signed contract", ], "min_matches": 1, "base_score": 65, }, ViolationType.PAYROLL_RECORD: { "keywords": [ "payroll", "pay period", "pay stub", "direct deposit", "routing number", "bank account", "net pay", "gross pay", "tax deductions", "year-to-date", "ytd", "compensation record", ], "min_matches": 1, "base_score": 88, }, ViolationType.CUSTOMER_LIST: { "keywords": [ "customer list", "customer_id", "customer id", "crm export", "prospect list", "top-tier prospect", "annual_sales", "company_name", "bulk export", "sales campaign", ], "min_matches": 2, "base_score": 85, }, ViolationType.INTERNAL_MEMO: { "keywords": [ "internal only", "internal use only", "do not distribute", "not for external", "office of the ceo", "organizational priorities", "growth roadmap", "strictly confidential", "internal policy document", "headcount", ], "min_matches": 1, "base_score": 55, }, } def _normalize(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def _find_evidence(text: str, keyword: str) -> str | None: match = re.search(re.escape(keyword.strip()), text, flags=re.IGNORECASE) if not match: return None start = max(0, match.start() - 60) end = min(len(text), match.end() + 100) return _normalize(text[start:end]) def _risk_level_from_score(score: int) -> RiskLevel: if score >= 80: return RiskLevel.CRITICAL if score >= 60: return RiskLevel.HIGH if score >= 40: return RiskLevel.MEDIUM return RiskLevel.LOW def _action_from_score(score: int) -> ActionClass: if score >= 80: return ActionClass.BLOCK if score >= 40: return ActionClass.ALERT return ActionClass.PASS_ def review_corpus( email_file: str, subject: str, sender: str, recipient: str, date: str, body_text: str, attachment_texts: list[tuple[str, str]], attachment_results: list[AttachmentResult], processing_errors: list[str], ) -> DLPResult: """Judge an email using DLP_CATEGORIES signals from policy.py.""" # Build full text corpus parts = [ f"Subject: {subject}", f"From: {sender}", f"To: {recipient}", body_text, ] for filename, text in attachment_texts: parts.append(f"Attachment: {filename}") parts.append(text) raw = "\n".join(p for p in parts if p) lower = raw.lower() evidence_map: dict[ViolationType, list[str]] = defaultdict(list) score_map: dict[ViolationType, int] = {} for vtype, rule in _POLICY_KEYWORDS.items(): keywords: list[str] = rule["keywords"] min_matches: int = rule["min_matches"] base_score: int = rule["base_score"] match_count = 0 for kw in keywords: if kw.lower() in lower: match_count += 1 ev = _find_evidence(raw, kw) if ev and ev not in evidence_map[vtype]: evidence_map[vtype].append(ev) if match_count < min_matches: continue score = base_score + min(12, (match_count - 1) * 3) # Context boost: external recipient domain recipient_lower = recipient.lower() if any(d in recipient_lower for d in ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com"]): score += 6 score_map[vtype] = min(99, score) if not score_map: category_desc = DLP_CATEGORIES # keep reference to show it's used _ = category_desc # suppress unused warning return DLPResult( email_file=email_file, subject=subject, sender=sender, recipient=recipient, date=date, risk_level=RiskLevel.LOW, risk_score=12, violation_types=[ViolationType.NONE], action=ActionClass.PASS_, summary="Policy review found no DLP category signals in this email.", evidence=[], attachments=attachment_results, processing_errors=processing_errors, ) ranked = sorted(score_map.items(), key=lambda x: x[1], reverse=True) violation_types = [vt for vt, _ in ranked[:3]] risk_score = ranked[0][1] if len(ranked) > 1: risk_score = min(99, risk_score + min(10, 3 * (len(ranked) - 1))) evidence: list[str] = [] for vt in violation_types: evidence.extend(evidence_map[vt][:2]) evidence = evidence[:5] risk_level = _risk_level_from_score(risk_score) action = _action_from_score(risk_score) violation_labels = ", ".join(v.value for v in violation_types) summary = ( f"Policy review flagged {violation_labels} with {risk_level.value} risk " f"(score {risk_score}) using DLP_CATEGORIES signals from policy.py." ) return DLPResult( email_file=email_file, subject=subject, sender=sender, recipient=recipient, date=date, risk_level=risk_level, risk_score=risk_score, violation_types=violation_types, action=action, summary=summary, evidence=evidence, attachments=attachment_results, processing_errors=processing_errors, )