"""CLI entry point: preview and batch process .eml files through the DLP pipeline.""" import json from datetime import datetime from pathlib import Path from typing import Optional import typer from rich.console import Console from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeElapsedColumn from rich.table import Table from .analyzer import _build_user_content, analyze_email, build_system_prompt from .converter import IMAGE_SENTINEL, convert_attachment from .models import ActionClass, AttachmentResult, DLPResult from .parser import parse_eml from .policy_reviewer import review_corpus from .simulator import simulate_analysis app = typer.Typer(help="Email DLP — scan .eml files for data loss prevention policy violations.") console = Console() ACTION_COLORS = { ActionClass.BLOCK: "bold red", ActionClass.ALERT: "bold yellow", ActionClass.PASS_: "bold green", } RISK_COLORS = { "CRITICAL": "bold red", "HIGH": "red", "MEDIUM": "yellow", "LOW": "green", } def _process_single_email( eml_path: Path, endpoint: str, model: str, backend: str = "llm", ) -> DLPResult: """Parse, convert, and analyze one .eml file.""" processing_errors: list[str] = [] # 1. Parse MIME parsed = parse_eml(eml_path) try: # 2. Convert attachments attachment_texts: list[tuple[str, str]] = [] attachment_results: list[AttachmentResult] = [] for att in parsed.attachments: entries = convert_attachment(att.path, att.filename) for display_name, text, status in entries: if "truncated" in status: processing_errors.append( f"'{display_name}' truncated to 20000 chars" ) attachment_texts.append((display_name, text)) attachment_results.append( AttachmentResult( filename=display_name, content_type=att.content_type, extracted_text_chars=0 if text.startswith(IMAGE_SENTINEL) else len(text), conversion_status=status.split("|")[0], ) ) # 3. Analyze with LLM result = analyze_email( email_file=eml_path.name, subject=parsed.subject, sender=parsed.sender, recipient=parsed.recipient, date=parsed.date, body_text=parsed.body_text, attachment_texts=attachment_texts, attachment_results=attachment_results, processing_errors=processing_errors, endpoint=endpoint, model=model, backend=backend, ) finally: parsed.cleanup() return result def _simulate_single_email(eml_path: Path) -> DLPResult: """Parse, convert, and simulate one .eml file without an LLM.""" processing_errors: list[str] = [] parsed = parse_eml(eml_path) try: attachment_texts: list[tuple[str, str]] = [] attachment_results: list[AttachmentResult] = [] for att in parsed.attachments: entries = convert_attachment(att.path, att.filename) for display_name, text, status in entries: if "truncated" in status: processing_errors.append( f"'{display_name}' truncated to 20000 chars" ) attachment_texts.append((display_name, text)) attachment_results.append( AttachmentResult( filename=display_name, content_type=att.content_type, extracted_text_chars=0 if text.startswith(IMAGE_SENTINEL) else len(text), conversion_status=status.split("|")[0], ) ) result = simulate_analysis( email_file=eml_path.name, subject=parsed.subject, sender=parsed.sender, recipient=parsed.recipient, date=parsed.date, body_text=parsed.body_text, attachment_texts=attachment_texts, attachment_results=attachment_results, processing_errors=processing_errors, ) finally: parsed.cleanup() return result def _preview_single_email( eml_path: Path, include_system_prompt: bool = True, include_full_prompt: bool = False, ) -> dict: """Parse and convert one .eml file without calling the LLM.""" parsed = parse_eml(eml_path) try: attachments_preview: list[dict[str, object]] = [] attachment_texts: list[tuple[str, str]] = [] processing_errors: list[str] = [] for att in parsed.attachments: entries = convert_attachment(att.path, att.filename) for display_name, text, status in entries: is_image = text.startswith(IMAGE_SENTINEL) if "truncated" in status: processing_errors.append( f"'{display_name}' truncated to 20000 chars" ) attachment_texts.append((display_name, text)) attachments_preview.append( { "filename": display_name, "content_type": att.content_type, "conversion_status": status, "is_image": is_image, "extracted_text_chars": 0 if is_image else len(text), "text_preview": ( None if is_image else text[:500] ), "image_data_url_preview": ( None if not is_image else text[:120] + "..." if len(text) > 120 else text ), } ) llm_user_content = _build_user_content( subject=parsed.subject, sender=parsed.sender, recipient=parsed.recipient, date=parsed.date, body_text=parsed.body_text, attachment_texts=attachment_texts, ) llm_user_content_preview: list[dict[str, object]] = [] for block in llm_user_content: if block["type"] == "text": llm_user_content_preview.append( { "type": "text", "text_preview": str(block["text"])[:1000], "text_chars": len(str(block["text"])), } ) else: url = str(block["image_url"]["url"]) llm_user_content_preview.append( { "type": "image_url", "url_preview": url[:120] + ("..." if len(url) > 120 else ""), "url_chars": len(url), } ) preview_result = { "email_file": eml_path.name, "subject": parsed.subject, "sender": parsed.sender, "recipient": parsed.recipient, "date": parsed.date, "body_text_chars": len(parsed.body_text), "body_text_preview": parsed.body_text[:1000], "attachment_count": len(parsed.attachments), "attachments": attachments_preview, "processing_errors": processing_errors, "llm_user_content_preview": llm_user_content_preview, } if include_system_prompt: system_prompt = build_system_prompt() preview_result["llm_system_prompt_preview"] = { "text_preview": system_prompt[:2000], "text_chars": len(system_prompt), } if include_full_prompt: preview_result["llm_system_prompt"] = system_prompt if include_full_prompt: preview_result["llm_user_content"] = llm_user_content return preview_result finally: parsed.cleanup() @app.command() def analyze( input_dir: Path = typer.Option( Path("data"), "--input", "-i", help="Directory containing .eml files", ), output_dir: Optional[Path] = typer.Option( None, "--output", "-o", help="Directory to write JSON results (defaults to output/analyze-TIMESTAMP)", ), endpoint: str = typer.Option( "http://localhost:8000/v1", "--endpoint", help="vLLM OpenAI-compatible endpoint", ), model: str = typer.Option( "Qwen/Qwen3.5-35B-A3B", "--model", help="Model name to use for analysis", ), backend: str = typer.Option( "llm", "--backend", help="Analysis backend: 'llm' for API calls, 'simulated' for local deterministic analysis", ), summary: bool = typer.Option( False, "--summary", "-s", help="Print a summary table after processing", ), ) -> None: """Batch analyze all .eml files in INPUT_DIR and write JSON results to OUTPUT_DIR.""" if output_dir is None: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") output_dir = Path("output") / f"analyze-{timestamp}" eml_files = sorted(input_dir.glob("*.eml")) if not eml_files: console.print(f"[red]No .eml files found in {input_dir}[/red]") raise typer.Exit(1) output_dir.mkdir(parents=True, exist_ok=True) results: list[DLPResult] = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task("Analyzing emails...", total=len(eml_files)) for eml_path in eml_files: progress.update(task, description=f"[cyan]{eml_path.name[:50]}[/cyan]") try: result = _process_single_email(eml_path, endpoint, model, backend=backend) except Exception as e: console.print(f"[red]Error processing {eml_path.name}: {e}[/red]") progress.advance(task) continue # Write individual JSON result out_file = output_dir / (eml_path.stem + ".json") out_file.write_text(result.model_dump_json(indent=2)) results.append(result) progress.advance(task) # Write batch summary batch_summary = { "total": len(results), "by_action": { "BLOCK": sum(1 for r in results if r.action == ActionClass.BLOCK), "ALERT": sum(1 for r in results if r.action == ActionClass.ALERT), "PASS": sum(1 for r in results if r.action == ActionClass.PASS_), }, "by_risk": { level: sum(1 for r in results if r.risk_level.value == level) for level in ["CRITICAL", "HIGH", "MEDIUM", "LOW"] }, "emails": [ { "file": r.email_file, "subject": r.subject, "risk_level": r.risk_level.value, "risk_score": r.risk_score, "action": r.action.value, "violation_types": [v.value for v in r.violation_types], } for r in results ], } (output_dir / "batch_summary.json").write_text( json.dumps(batch_summary, indent=2) ) console.print(f"\n[bold green]Done![/bold green] Processed {len(results)}/{len(eml_files)} emails.") console.print(f"Results written to: [cyan]{output_dir}/[/cyan]") if summary and results: _print_summary_table(results) @app.command() def preview( input_dir: Path = typer.Option( Path("data"), "--input", "-i", help="Directory containing .eml files", ), output_dir: Optional[Path] = typer.Option( None, "--output", "-o", help="Optional directory to write preview JSON files", ), print_json: bool = typer.Option( False, "--print-json", help="Print preview JSON to stdout", ), include_system_prompt: bool = typer.Option( True, "--include-system-prompt/--no-system-prompt", help="Include the analyzer system prompt built from policy.py", ), include_full_prompt: bool = typer.Option( False, "--include-full-prompt", help="Include the full system prompt and full user content in JSON output", ), ) -> None: """Preview parsed email and converted attachment content before LLM analysis.""" eml_files = sorted(input_dir.glob("*.eml")) if not eml_files: console.print(f"[red]No .eml files found in {input_dir}[/red]") raise typer.Exit(1) if output_dir is not None: output_dir.mkdir(parents=True, exist_ok=True) previews: list[dict] = [] for eml_path in eml_files: try: preview_result = _preview_single_email( eml_path, include_system_prompt=include_system_prompt, include_full_prompt=include_full_prompt, ) except Exception as e: console.print(f"[red]Error previewing {eml_path.name}: {e}[/red]") continue previews.append(preview_result) if output_dir is not None: out_file = output_dir / f"{eml_path.stem}.preview.json" out_file.write_text(json.dumps(preview_result, indent=2)) if output_dir is not None: batch_file = output_dir / "batch_preview.json" batch_file.write_text(json.dumps(previews, indent=2)) console.print(f"[green]Preview JSON written to[/green] [cyan]{output_dir}[/cyan]") if print_json: console.print_json(json.dumps(previews, indent=2)) else: table = Table(title="Email Preview Results", show_lines=True) table.add_column("File", style="dim", max_width=45) table.add_column("Body Chars", justify="right") table.add_column("Attachments", justify="right") table.add_column("Errors", justify="right") for preview_result in previews: table.add_row( str(preview_result["email_file"]), str(preview_result["body_text_chars"]), str(preview_result["attachment_count"]), str(len(preview_result["processing_errors"])), ) console.print(table) @app.command() def simulate( input_dir: Path = typer.Option( Path("data"), "--input", "-i", help="Directory containing .eml files", ), output_dir: Optional[Path] = typer.Option( None, "--output", "-o", help="Directory to write simulated JSON results (defaults to output/simulated-TIMESTAMP)", ), summary: bool = typer.Option( True, "--summary/--no-summary", help="Print a summary table after processing", ), ) -> None: """Batch simulate DLP analysis locally without calling an LLM.""" if output_dir is None: timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") output_dir = Path("output") / f"simulated-{timestamp}" eml_files = sorted(input_dir.glob("*.eml")) if not eml_files: console.print(f"[red]No .eml files found in {input_dir}[/red]") raise typer.Exit(1) output_dir.mkdir(parents=True, exist_ok=True) results: list[DLPResult] = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), console=console, ) as progress: task = progress.add_task("Simulating email analysis...", total=len(eml_files)) for eml_path in eml_files: progress.update(task, description=f"[cyan]{eml_path.name[:50]}[/cyan]") try: result = _simulate_single_email(eml_path) except Exception as e: console.print(f"[red]Error processing {eml_path.name}: {e}[/red]") progress.advance(task) continue out_file = output_dir / (eml_path.stem + ".json") out_file.write_text(result.model_dump_json(indent=2)) results.append(result) progress.advance(task) batch_summary = { "total": len(results), "generator": "local-simulator", "model_label": "gpt-5.4-simulated", "by_action": { "BLOCK": sum(1 for r in results if r.action == ActionClass.BLOCK), "ALERT": sum(1 for r in results if r.action == ActionClass.ALERT), "PASS": sum(1 for r in results if r.action == ActionClass.PASS_), }, "by_risk": { level: sum(1 for r in results if r.risk_level.value == level) for level in ["CRITICAL", "HIGH", "MEDIUM", "LOW"] }, "emails": [ { "file": r.email_file, "subject": r.subject, "risk_level": r.risk_level.value, "risk_score": r.risk_score, "action": r.action.value, "violation_types": [v.value for v in r.violation_types], } for r in results ], } (output_dir / "batch_summary.json").write_text( json.dumps(batch_summary, indent=2) ) console.print( f"\n[bold green]Done![/bold green] Simulated {len(results)}/{len(eml_files)} emails." ) console.print(f"Results written to: [cyan]{output_dir}/[/cyan]") if summary and results: _print_summary_table(results) def _print_summary_table(results: list[DLPResult]) -> None: """Print a rich summary table to the console.""" table = Table(title="Email DLP Analysis Results", show_lines=True) table.add_column("File", style="dim", max_width=45) table.add_column("Risk Level", justify="center") table.add_column("Score", justify="center") table.add_column("Action", justify="center") table.add_column("Violations", max_width=40) for r in results: risk_color = RISK_COLORS.get(r.risk_level.value, "white") action_color = ACTION_COLORS.get(r.action, "white") violations = ", ".join(v.value for v in r.violation_types) table.add_row( r.email_file, f"[{risk_color}]{r.risk_level.value}[/{risk_color}]", str(r.risk_score), f"[{action_color}]{r.action.value}[/{action_color}]", violations, ) console.print(table)