"""MIME email parsing: extract headers, body text, and attachments.""" import email import email.policy import tempfile from dataclasses import dataclass, field from pathlib import Path from bs4 import BeautifulSoup @dataclass class ParsedAttachment: filename: str path: Path content_type: str @dataclass class ParsedEmail: subject: str sender: str recipient: str date: str body_text: str attachments: list[ParsedAttachment] = field(default_factory=list) # tempdir must be kept alive by the caller _tempdir: tempfile.TemporaryDirectory | None = field(default=None, repr=False) def cleanup(self) -> None: if self._tempdir is not None: self._tempdir.cleanup() self._tempdir = None def _decode_header_value(value: str | None) -> str: if value is None: return "" # Decode RFC2047 encoded words (e.g. =?Windows-1252?Q?...?=) decoded_parts = email.header.decode_header(str(value)) result = "" for chunk, charset in decoded_parts: if isinstance(chunk, bytes): result += chunk.decode(charset or "utf-8", errors="replace") else: result += chunk return result.strip() def _extract_body(msg: email.message.Message) -> str: """Walk MIME parts and extract the best plain-text body.""" plain_parts: list[str] = [] html_parts: list[str] = [] if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() disposition = str(part.get("Content-Disposition", "")) # Skip attachments if "attachment" in disposition: continue if ct == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" plain_parts.append(payload.decode(charset, errors="replace")) elif ct == "text/html": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" html_parts.append(payload.decode(charset, errors="replace")) else: ct = msg.get_content_type() payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" text = payload.decode(charset, errors="replace") if ct == "text/plain": plain_parts.append(text) elif ct == "text/html": html_parts.append(text) if plain_parts: return "\n\n".join(plain_parts).strip() # Fall back to HTML → plain text via BeautifulSoup if html_parts: combined_html = "\n".join(html_parts) soup = BeautifulSoup(combined_html, "html.parser") return soup.get_text(separator="\n").strip() return "" _IMAGE_CONTENT_TYPES = { "image/jpeg", "image/png", "image/gif", "image/bmp", "image/tiff", "image/webp", } _IMAGE_EXTS = { "image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif", "image/bmp": ".bmp", "image/tiff": ".tiff", "image/webp": ".webp", } def _collect_attachments( msg: email.message.Message, tmpdir: Path ) -> list[ParsedAttachment]: """Extract all attachment parts and write them to tmpdir. Also captures inline images (CID-embedded) that have no filename. """ attachments: list[ParsedAttachment] = [] seen_names: set[str] = set() inline_image_counter = 0 for part in msg.walk(): disposition = str(part.get("Content-Disposition", "")) content_type = part.get_content_type() filename = part.get_filename() # Inline image without a filename — generate one from Content-ID or counter if filename is None and content_type in _IMAGE_CONTENT_TYPES: cid = str(part.get("Content-ID", "")).strip("<>").split("@")[0] ext = _IMAGE_EXTS.get(content_type, ".img") filename = f"inline_{cid or inline_image_counter}{ext}" inline_image_counter += 1 elif filename is None and "attachment" not in disposition: continue elif filename is None: # Unnamed non-image attachment — skip continue # Decode RFC2047 filename if needed decoded_parts = email.header.decode_header(filename) filename_clean = "" for chunk, charset in decoded_parts: if isinstance(chunk, bytes): filename_clean += chunk.decode(charset or "utf-8", errors="replace") else: filename_clean += chunk # Avoid duplicates base_name = filename_clean counter = 1 while filename_clean in seen_names: stem = Path(base_name).stem suffix = Path(base_name).suffix filename_clean = f"{stem}_{counter}{suffix}" counter += 1 seen_names.add(filename_clean) payload = part.get_payload(decode=True) if payload is None: continue dest = tmpdir / filename_clean dest.write_bytes(payload) attachments.append( ParsedAttachment( filename=filename_clean, path=dest, content_type=part.get_content_type(), ) ) return attachments def parse_eml(eml_path: Path) -> ParsedEmail: """Parse an .eml file and return a ParsedEmail object. The caller is responsible for calling parsed_email.cleanup() when done, or using ParsedEmail as a context manager is not implemented — keep the return value alive until you no longer need the attachment paths. """ with open(eml_path, "rb") as f: msg = email.message_from_binary_file(f, policy=email.policy.compat32) subject = _decode_header_value(msg.get("Subject")) sender = _decode_header_value(msg.get("From")) recipient = _decode_header_value(msg.get("To")) date = _decode_header_value(msg.get("Date")) body_text = _extract_body(msg) tmpdir_obj = tempfile.TemporaryDirectory(prefix="email_dlp_") tmpdir = Path(tmpdir_obj.name) attachments = _collect_attachments(msg, tmpdir) return ParsedEmail( subject=subject, sender=sender, recipient=recipient, date=date, body_text=body_text, attachments=attachments, _tempdir=tmpdir_obj, )