200 lines
6.3 KiB
Python
200 lines
6.3 KiB
Python
"""MIME email parsing: extract headers, body text, and attachments."""
|
|
|
|
import email
|
|
import email.policy
|
|
import tempfile
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
|
|
@dataclass
|
|
class ParsedAttachment:
|
|
filename: str
|
|
path: Path
|
|
content_type: str
|
|
|
|
|
|
@dataclass
|
|
class ParsedEmail:
|
|
subject: str
|
|
sender: str
|
|
recipient: str
|
|
date: str
|
|
body_text: str
|
|
attachments: list[ParsedAttachment] = field(default_factory=list)
|
|
# tempdir must be kept alive by the caller
|
|
_tempdir: tempfile.TemporaryDirectory | None = field(default=None, repr=False)
|
|
|
|
def cleanup(self) -> None:
|
|
if self._tempdir is not None:
|
|
self._tempdir.cleanup()
|
|
self._tempdir = None
|
|
|
|
|
|
def _decode_header_value(value: str | None) -> str:
|
|
if value is None:
|
|
return ""
|
|
# Decode RFC2047 encoded words (e.g. =?Windows-1252?Q?...?=)
|
|
decoded_parts = email.header.decode_header(str(value))
|
|
result = ""
|
|
for chunk, charset in decoded_parts:
|
|
if isinstance(chunk, bytes):
|
|
result += chunk.decode(charset or "utf-8", errors="replace")
|
|
else:
|
|
result += chunk
|
|
return result.strip()
|
|
|
|
|
|
def _extract_body(msg: email.message.Message) -> str:
|
|
"""Walk MIME parts and extract the best plain-text body."""
|
|
plain_parts: list[str] = []
|
|
html_parts: list[str] = []
|
|
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
disposition = str(part.get("Content-Disposition", ""))
|
|
# Skip attachments
|
|
if "attachment" in disposition:
|
|
continue
|
|
if ct == "text/plain":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
plain_parts.append(payload.decode(charset, errors="replace"))
|
|
elif ct == "text/html":
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
html_parts.append(payload.decode(charset, errors="replace"))
|
|
else:
|
|
ct = msg.get_content_type()
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
text = payload.decode(charset, errors="replace")
|
|
if ct == "text/plain":
|
|
plain_parts.append(text)
|
|
elif ct == "text/html":
|
|
html_parts.append(text)
|
|
|
|
if plain_parts:
|
|
return "\n\n".join(plain_parts).strip()
|
|
|
|
# Fall back to HTML → plain text via BeautifulSoup
|
|
if html_parts:
|
|
combined_html = "\n".join(html_parts)
|
|
soup = BeautifulSoup(combined_html, "html.parser")
|
|
return soup.get_text(separator="\n").strip()
|
|
|
|
return ""
|
|
|
|
|
|
_IMAGE_CONTENT_TYPES = {
|
|
"image/jpeg", "image/png", "image/gif",
|
|
"image/bmp", "image/tiff", "image/webp",
|
|
}
|
|
_IMAGE_EXTS = {
|
|
"image/jpeg": ".jpg", "image/png": ".png", "image/gif": ".gif",
|
|
"image/bmp": ".bmp", "image/tiff": ".tiff", "image/webp": ".webp",
|
|
}
|
|
|
|
|
|
def _collect_attachments(
|
|
msg: email.message.Message, tmpdir: Path
|
|
) -> list[ParsedAttachment]:
|
|
"""Extract all attachment parts and write them to tmpdir.
|
|
|
|
Also captures inline images (CID-embedded) that have no filename.
|
|
"""
|
|
attachments: list[ParsedAttachment] = []
|
|
seen_names: set[str] = set()
|
|
inline_image_counter = 0
|
|
|
|
for part in msg.walk():
|
|
disposition = str(part.get("Content-Disposition", ""))
|
|
content_type = part.get_content_type()
|
|
filename = part.get_filename()
|
|
|
|
# Inline image without a filename — generate one from Content-ID or counter
|
|
if filename is None and content_type in _IMAGE_CONTENT_TYPES:
|
|
cid = str(part.get("Content-ID", "")).strip("<>").split("@")[0]
|
|
ext = _IMAGE_EXTS.get(content_type, ".img")
|
|
filename = f"inline_{cid or inline_image_counter}{ext}"
|
|
inline_image_counter += 1
|
|
elif filename is None and "attachment" not in disposition:
|
|
continue
|
|
elif filename is None:
|
|
# Unnamed non-image attachment — skip
|
|
continue
|
|
|
|
# Decode RFC2047 filename if needed
|
|
decoded_parts = email.header.decode_header(filename)
|
|
filename_clean = ""
|
|
for chunk, charset in decoded_parts:
|
|
if isinstance(chunk, bytes):
|
|
filename_clean += chunk.decode(charset or "utf-8", errors="replace")
|
|
else:
|
|
filename_clean += chunk
|
|
|
|
# Avoid duplicates
|
|
base_name = filename_clean
|
|
counter = 1
|
|
while filename_clean in seen_names:
|
|
stem = Path(base_name).stem
|
|
suffix = Path(base_name).suffix
|
|
filename_clean = f"{stem}_{counter}{suffix}"
|
|
counter += 1
|
|
seen_names.add(filename_clean)
|
|
|
|
payload = part.get_payload(decode=True)
|
|
if payload is None:
|
|
continue
|
|
|
|
dest = tmpdir / filename_clean
|
|
dest.write_bytes(payload)
|
|
|
|
attachments.append(
|
|
ParsedAttachment(
|
|
filename=filename_clean,
|
|
path=dest,
|
|
content_type=part.get_content_type(),
|
|
)
|
|
)
|
|
|
|
return attachments
|
|
|
|
|
|
def parse_eml(eml_path: Path) -> ParsedEmail:
|
|
"""Parse an .eml file and return a ParsedEmail object.
|
|
|
|
The caller is responsible for calling parsed_email.cleanup() when done,
|
|
or using ParsedEmail as a context manager is not implemented — keep
|
|
the return value alive until you no longer need the attachment paths.
|
|
"""
|
|
with open(eml_path, "rb") as f:
|
|
msg = email.message_from_binary_file(f, policy=email.policy.compat32)
|
|
|
|
subject = _decode_header_value(msg.get("Subject"))
|
|
sender = _decode_header_value(msg.get("From"))
|
|
recipient = _decode_header_value(msg.get("To"))
|
|
date = _decode_header_value(msg.get("Date"))
|
|
|
|
body_text = _extract_body(msg)
|
|
|
|
tmpdir_obj = tempfile.TemporaryDirectory(prefix="email_dlp_")
|
|
tmpdir = Path(tmpdir_obj.name)
|
|
attachments = _collect_attachments(msg, tmpdir)
|
|
|
|
return ParsedEmail(
|
|
subject=subject,
|
|
sender=sender,
|
|
recipient=recipient,
|
|
date=date,
|
|
body_text=body_text,
|
|
attachments=attachments,
|
|
_tempdir=tmpdir_obj,
|
|
)
|