"""DLP analysis backends: remote LLM or deterministic local simulation.""" import json import os import re from typing import Any import openai from dotenv import load_dotenv load_dotenv() from .converter import IMAGE_SENTINEL from .models import ActionClass, AttachmentResult, DLPResult, RiskLevel, ViolationType from .policy import format_policy_for_prompt from .simulator import simulate_analysis OUTPUT_SCHEMA = { "type": "object", "properties": { "risk_level": {"type": "string", "enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"]}, "risk_score": {"type": "integer", "minimum": 0, "maximum": 100}, "violation_types": { "type": "array", "items": { "type": "string", "enum": [ "PII", "FINANCIAL_DATA", "SOURCE_CODE", "REGULATORY_DOCUMENT", "LEGAL_CONTRACT", "PAYROLL_RECORD", "CUSTOMER_LIST", "INTERNAL_MEMO", "NONE", ], }, }, "action": {"type": "string", "enum": ["PASS", "ALERT", "BLOCK"]}, "summary": {"type": "string"}, "evidence": {"type": "array", "items": {"type": "string"}}, }, "required": ["risk_level", "risk_score", "violation_types", "action", "summary", "evidence"], } SYSTEM_PROMPT_TEMPLATE = """\ You are a Data Loss Prevention (DLP) analyst. Your task is to evaluate email content and attachments against the DLP policy below, then return a structured JSON decision. ## DLP Policy {policy_json} ## Output Schema Respond with valid JSON only (no markdown fences, no extra text) matching this schema: {schema_json} ## Critical Rules 1. temperature=0: be deterministic and consistent. 2. evidence: include direct quotes (verbatim excerpts) from the actual email or attachment content that justify your decision. Do not paraphrase. 3. risk_score: assign 0-100. Use the full range — a customer CSV with 500 rows should score 95+, a casual internal memo scores 60-70. 4. action: MUST match the threshold — BLOCK if risk_score>=80, ALERT if risk_score>=40, PASS otherwise. 5. If no policy violations are found, set violation_types=["NONE"], risk_level="LOW", risk_score<40, action="PASS". """ def build_system_prompt() -> str: """Build the system prompt used for DLP analysis.""" return SYSTEM_PROMPT_TEMPLATE.format( policy_json=format_policy_for_prompt(), schema_json=json.dumps(OUTPUT_SCHEMA, indent=2), ) def _build_user_content( subject: str, sender: str, recipient: str, date: str, body_text: str, attachment_texts: list[tuple[str, str]], # [(filename, text_or_sentinel)] ) -> list[dict[str, Any]]: """Build the user message content for the VLM. Returns a multimodal content list (OpenAI vision format). Text attachments are embedded as text blocks; image attachments are inserted as image_url blocks so the VLM can see them directly. """ header_block = "\n".join([ "## Email Headers", f"Subject: {subject}", f"From: {sender}", f"To: {recipient}", f"Date: {date}", "", "## Email Body", body_text or "(empty body)", ]) content: list[dict[str, Any]] = [{"type": "text", "text": header_block}] for filename, text in attachment_texts: if text.startswith(IMAGE_SENTINEL): # IMAGE_SENTINEL format: "__IMAGE__::" payload = text[len(IMAGE_SENTINEL):] # ":" mime, b64 = payload.split(":", 1) content.append({"type": "text", "text": f"\n## Attachment: {filename} (image)"}) content.append({ "type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}, }) else: content.append({ "type": "text", "text": f"\n## Attachment: {filename}\n{text or '(no extractable text)'}", }) return content def _parse_llm_response(content: str) -> dict[str, Any]: """Parse JSON from LLM response, handling markdown fences.""" content = content.strip() # Strip triple-backtick fences fence_match = re.search(r"```(?:json)?\s*([\s\S]*?)\s*```", content) if fence_match: content = fence_match.group(1) return json.loads(content) def _map_action(action_str: str) -> ActionClass: mapping = {"PASS": ActionClass.PASS_, "ALERT": ActionClass.ALERT, "BLOCK": ActionClass.BLOCK} return mapping.get(action_str.upper(), ActionClass.ALERT) def analyze_email( email_file: str, subject: str, sender: str, recipient: str, date: str, body_text: str, attachment_texts: list[tuple[str, str]], attachment_results: list[AttachmentResult], processing_errors: list[str], endpoint: str = "http://localhost:8000/v1", model: str = "Qwen/Qwen3.5-35B-A3B", backend: str = "llm", ) -> DLPResult: """Analyze email content using an LLM or the deterministic simulator.""" if backend == "simulated": return simulate_analysis( email_file=email_file, subject=subject, sender=sender, recipient=recipient, date=date, body_text=body_text, attachment_texts=attachment_texts, attachment_results=attachment_results, processing_errors=processing_errors, ) # Use environment variables as fallback if they exist final_endpoint = os.getenv("OPENAI_BASE_URL", endpoint) final_api_key = os.getenv("OPENAI_API_KEY", "not-needed") final_model = os.getenv("MODEL_NAME", model) client = openai.OpenAI(base_url=final_endpoint, api_key=final_api_key) system_prompt = build_system_prompt() user_content = _build_user_content( subject=subject, sender=sender, recipient=recipient, date=date, body_text=body_text, attachment_texts=attachment_texts, ) response = client.chat.completions.create( model=final_model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, # list[dict] for multimodal ], temperature=0.0, max_tokens=1024, extra_body={"chat_template_kwargs": {"enable_thinking": False}}, ) raw_content = response.choices[0].message.content or "" try: parsed = _parse_llm_response(raw_content) except json.JSONDecodeError as e: processing_errors.append(f"JSON parse error: {e}; raw={raw_content[:200]}") # Return a safe fallback return DLPResult( email_file=email_file, subject=subject, sender=sender, recipient=recipient, date=date, risk_level=RiskLevel.HIGH, risk_score=60, violation_types=[ViolationType.NONE], action=ActionClass.ALERT, summary="Analysis failed due to JSON parse error.", evidence=[], attachments=attachment_results, processing_errors=processing_errors, ) # Map string values to enums risk_level = RiskLevel(parsed.get("risk_level", "HIGH")) violation_types = [ ViolationType(v) for v in parsed.get("violation_types", ["NONE"]) if v in ViolationType.__members__ ] if not violation_types: violation_types = [ViolationType.NONE] action = _map_action(parsed.get("action", "ALERT")) return DLPResult( email_file=email_file, subject=subject, sender=sender, recipient=recipient, date=date, risk_level=risk_level, risk_score=int(parsed.get("risk_score", 60)), violation_types=violation_types, action=action, summary=parsed.get("summary", ""), evidence=parsed.get("evidence", []), attachments=attachment_results, processing_errors=processing_errors, )