"""Deterministic local simulator for DLP analysis.""" from __future__ import annotations import re from collections import defaultdict from .converter import IMAGE_SENTINEL from .models import ActionClass, AttachmentResult, DLPResult, RiskLevel, ViolationType _CATEGORY_RULES: dict[ViolationType, dict[str, object]] = { ViolationType.PII: { "keywords": [ "personally identifiable information", " pii", "employee id", "account ending", "direct deposit", "customer_id", "first_name", "last_name", ], "base_score": 30, "min_matches": 2, }, ViolationType.FINANCIAL_DATA: { "keywords": [ "financial forecast", "revenue", "ebitda", "gross margin", "margin efficiency", "sales data", "annual_sales_usd", "invoice", "amount due", "payment instructions", "ach", "budget", ], "base_score": 42, }, ViolationType.SOURCE_CODE: { "keywords": [ "source code", "api key", "model weights", "from __future__ import annotations", "def ", "class ", "@dataclass", ], "base_score": 88, "min_matches": 2, }, ViolationType.REGULATORY_DOCUMENT: { "keywords": [ "regulatory document", "regulatory submission", "cfpb", "compliance report", "not for public release", "draft regulatory", "prepared by: legal & compliance team", ], "base_score": 84, }, ViolationType.LEGAL_CONTRACT: { "keywords": [ "nondisclosure agreement", "non-disclosure agreement", "executed nda", "disclosing party", "receiving party", ], "base_score": 62, "min_matches": 1, }, ViolationType.PAYROLL_RECORD: { "keywords": [ "payroll", "pay stub", "compensation record", "gross:", "net pay", "tax deductions", "pay period", "direct deposit", "employee id", ], "base_score": 90, }, ViolationType.CUSTOMER_LIST: { "keywords": [ "customer list", "prospects", "crm export", "raw export", "customer_id", "company_name", "annual_sales_usd", "top-tier prospects", ], "base_score": 86, "min_matches": 2, }, ViolationType.INTERNAL_MEMO: { "keywords": [ "internal use only", "internal memo", "do not distribute externally", "office of the ceo", "organizational priorities", "growth roadmap", "internal policy document", "not for public distribution", "strictly confidential", ], "base_score": 52, "min_matches": 1, }, } _RISK_LEVELS = [ (80, RiskLevel.CRITICAL), (60, RiskLevel.HIGH), (40, RiskLevel.MEDIUM), (0, RiskLevel.LOW), ] def _normalize_text(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def _build_corpus( subject: str, sender: str, recipient: str, body_text: str, attachment_texts: list[tuple[str, str]], ) -> tuple[str, str]: text_chunks = [ f"Subject: {subject}", f"From: {sender}", f"To: {recipient}", body_text, ] for filename, text in attachment_texts: text_chunks.append(f"Attachment: {filename}") # Skip binary image data — base64 payloads produce false keyword matches if not text.startswith(IMAGE_SENTINEL): text_chunks.append(text) raw = "\n".join(chunk for chunk in text_chunks if chunk) return raw, raw.lower() def _find_evidence(text: str, keyword: str) -> str | None: pattern = re.escape(keyword.strip()) match = re.search(pattern, text, flags=re.IGNORECASE) if not match: return None start = max(0, match.start() - 60) end = min(len(text), match.end() + 100) return _normalize_text(text[start:end]) def _collect_matches( raw_text: str, lower_text: str, ) -> tuple[dict[ViolationType, list[str]], dict[ViolationType, int]]: evidence_map: dict[ViolationType, list[str]] = defaultdict(list) score_map: dict[ViolationType, int] = {} for violation_type, rule in _CATEGORY_RULES.items(): keywords = rule["keywords"] base_score = int(rule["base_score"]) min_matches = int(rule.get("min_matches", 1)) match_count = 0 for keyword in keywords: # Use word boundaries to avoid substring false positives (e.g. "ach" in "attached") pattern = r"\b" + re.escape(keyword) + r"\b" if re.search(pattern, lower_text): match_count += 1 evidence = _find_evidence(raw_text, keyword) if evidence and evidence not in evidence_map[violation_type]: evidence_map[violation_type].append(evidence) if match_count < min_matches: continue score = base_score + min(12, (match_count - 1) * 4) score_map[violation_type] = min(score, 99) return evidence_map, score_map def _apply_context_boosts( subject: str, recipient: str, attachment_texts: list[tuple[str, str]], score_map: dict[ViolationType, int], ) -> None: subject_lower = subject.lower() recipient_lower = recipient.lower() if any(domain in recipient_lower for domain in ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com"]): for violation_type in list(score_map): score_map[violation_type] = min(99, score_map[violation_type] + 6) if "urgent" in subject_lower or "confidential" in subject_lower: for violation_type in list(score_map): score_map[violation_type] = min(99, score_map[violation_type] + 2) attachment_names = " ".join(filename.lower() for filename, _ in attachment_texts) if ".csv" in attachment_names and ViolationType.CUSTOMER_LIST in score_map: score_map[ViolationType.CUSTOMER_LIST] = min( 99, score_map[ViolationType.CUSTOMER_LIST] + 6 ) if ".py" in attachment_names and ViolationType.SOURCE_CODE in score_map: score_map[ViolationType.SOURCE_CODE] = min( 99, score_map[ViolationType.SOURCE_CODE] + 4 ) def _risk_level_from_score(risk_score: int) -> RiskLevel: for threshold, risk_level in _RISK_LEVELS: if risk_score >= threshold: return risk_level return RiskLevel.LOW def _action_from_score(risk_score: int) -> ActionClass: if risk_score >= 80: return ActionClass.BLOCK if risk_score >= 40: return ActionClass.ALERT return ActionClass.PASS_ def _build_summary( violation_types: list[ViolationType], risk_level: RiskLevel, risk_score: int, ) -> str: if violation_types == [ViolationType.NONE]: return "No strong DLP indicators were found in the email body or converted attachments." labels = ", ".join(v.value for v in violation_types) return ( f"Simulated DLP review flagged {labels} with {risk_level.value} risk " f"(score {risk_score}) based on the email body and extracted attachment content." ) def simulate_analysis( email_file: str, subject: str, sender: str, recipient: str, date: str, body_text: str, attachment_texts: list[tuple[str, str]], attachment_results: list[AttachmentResult], processing_errors: list[str], ) -> DLPResult: """Predict a DLP result locally without calling an LLM.""" raw_text, lower_text = _build_corpus( subject=subject, sender=sender, recipient=recipient, body_text=body_text, attachment_texts=attachment_texts, ) evidence_map, score_map = _collect_matches(raw_text, lower_text) _apply_context_boosts(subject, recipient, attachment_texts, score_map) if not score_map: violation_types = [ViolationType.NONE] risk_score = 18 evidence: list[str] = [] else: ranked = sorted(score_map.items(), key=lambda item: item[1], reverse=True) violation_types = [violation for violation, _ in ranked[:3]] risk_score = ranked[0][1] if len(ranked) > 1: risk_score = min(99, risk_score + min(10, 3 * (len(ranked) - 1))) evidence = [] for violation_type in violation_types: evidence.extend(evidence_map.get(violation_type, [])[:2]) evidence = evidence[:5] risk_level = _risk_level_from_score(risk_score) action = _action_from_score(risk_score) return DLPResult( email_file=email_file, subject=subject, sender=sender, recipient=recipient, date=date, risk_level=risk_level, risk_score=risk_score, violation_types=violation_types, action=action, summary=_build_summary(violation_types, risk_level, risk_score), evidence=evidence, attachments=attachment_results, processing_errors=processing_errors, )