Files
email-dlp/email_dlp/converter.py
2026-03-20 10:28:28 +08:00

239 lines
8.1 KiB
Python

"""Attachment → markdown text conversion routing."""
import base64
import tempfile
import zipfile
from pathlib import Path
from markitdown import MarkItDown
MAX_TEXT_CHARS = 20_000
# Sentinel prefix used to pass image data through the (text, status) interface.
# Format: IMAGE_SENTINEL + "<mime_type>:<base64_data>"
IMAGE_SENTINEL = "__IMAGE__:"
_IMAGE_MIME = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".bmp": "image/bmp", ".tiff": "image/tiff", ".webp": "image/webp",
".img": "image/png", # fallback for generated inline names
}
def _convert_single_file(filepath: Path) -> tuple[str, str]:
"""Convert a single file to text. Returns (text, status).
For image files, text is IMAGE_SENTINEL + "<mime>:<base64>" and
status is "ok:image". Callers must check for the sentinel.
"""
suffix = filepath.suffix.lower()
# Image — return base64 sentinel for VLM consumption
if suffix in _IMAGE_MIME:
mime = _IMAGE_MIME[suffix]
b64 = base64.b64encode(filepath.read_bytes()).decode()
return IMAGE_SENTINEL + f"{mime}:{b64}", "ok:image"
known_binary_exts = {
".py", ".js", ".ts", ".java", ".c", ".cpp", ".h", ".cs",
".go", ".rb", ".rs", ".sh", ".txt", ".md", ".sql", ".yaml", ".yml",
".json", ".xml", ".html", ".htm", ".css",
}
if suffix in known_binary_exts:
# Plain text fallback — read directly
try:
text = filepath.read_text(errors="replace")
return text, "ok"
except Exception as e:
return "", f"failed: {e}"
# Use markitdown for PDF, DOCX, XLSX, CSV, etc.
try:
md = MarkItDown()
result = md.convert(str(filepath))
return result.text_content or "", "ok"
except Exception as e:
# Fallback to plain-text read for unknown types
try:
text = filepath.read_text(errors="replace")
return text, f"fallback: {e}"
except Exception as e2:
return "", f"failed: {e2}"
_OFFICE_MEDIA_DIRS = {
".docx": "word/media/",
".pptx": "ppt/media/",
".xlsx": "xl/media/",
}
_IMAGE_EXTS = set(_IMAGE_MIME.keys())
def _extract_pdf_images(
filepath: Path, filename: str
) -> list[tuple[str, str, str]]:
"""Extract embedded images from a PDF using PyMuPDF.
Returns list of (display_name, IMAGE_SENTINEL+..., "ok:image").
Returns empty list if fitz is not installed or no images found.
"""
try:
import fitz # PyMuPDF
except ImportError:
return []
results: list[tuple[str, str, str]] = []
try:
doc = fitz.open(str(filepath))
img_index = 0
for page in doc:
for img in page.get_images():
xref = img[0]
img_data = doc.extract_image(xref)
ext = img_data.get("ext", "png")
mime = _IMAGE_MIME.get(f".{ext}", f"image/{ext}")
b64 = base64.b64encode(img_data["image"]).decode()
display_name = f"{filename}/image_{img_index}.{ext}"
results.append((display_name, IMAGE_SENTINEL + f"{mime}:{b64}", "ok:image"))
img_index += 1
except Exception:
pass
return results
def _extract_office_images(
filepath: Path, filename: str
) -> list[tuple[str, str, str]]:
"""Extract embedded images from a DOCX/PPTX/XLSX using zipfile.
Returns list of (display_name, IMAGE_SENTINEL+..., "ok:image").
Returns empty list if the file is not a valid ZIP or has no images.
"""
suffix = Path(filename).suffix.lower()
media_dir = _OFFICE_MEDIA_DIRS.get(suffix)
if not media_dir:
return []
results: list[tuple[str, str, str]] = []
try:
with zipfile.ZipFile(str(filepath), "r") as zf:
for name in sorted(zf.namelist()):
if not name.startswith(media_dir):
continue
member_suffix = Path(name).suffix.lower()
if member_suffix not in _IMAGE_EXTS:
continue
mime = _IMAGE_MIME[member_suffix]
b64 = base64.b64encode(zf.read(name)).decode()
display_name = f"{filename}/{Path(name).name}"
results.append((display_name, IMAGE_SENTINEL + f"{mime}:{b64}", "ok:image"))
except Exception:
pass
return results
def _convert_7z(
filepath: Path, archive_name: str
) -> list[tuple[str, str, str]]:
"""Extract a .7z archive and convert each member.
Returns list of (display_name, text_or_sentinel, status), one entry per member.
display_name uses "archive.7z/member.ext" format.
"""
try:
import py7zr
except ImportError:
return [(archive_name, "", "failed: py7zr not installed")]
results: list[tuple[str, str, str]] = []
with tempfile.TemporaryDirectory(prefix="email_dlp_7z_") as tmpdir:
tmp = Path(tmpdir)
try:
with py7zr.SevenZipFile(str(filepath), mode="r") as archive:
archive.extractall(path=str(tmp))
except Exception as e:
return [(archive_name, "", f"failed: 7z extraction error: {e}")]
for member_path in sorted(tmp.rglob("*")):
if not member_path.is_file():
continue
display_name = f"{archive_name}/{member_path.name}"
text, status = _convert_single_file(member_path)
if not text.startswith(IMAGE_SENTINEL) and len(text) > MAX_TEXT_CHARS:
text = text[:MAX_TEXT_CHARS]
status = f"{status}|truncated_at_{MAX_TEXT_CHARS}"
results.append((display_name, text, status))
return results if results else [(archive_name, "", "skipped")]
def _convert_zip(
filepath: Path, archive_name: str
) -> list[tuple[str, str, str]]:
"""Extract a .zip archive and convert each member.
Returns list of (display_name, text_or_sentinel, status), one entry per member.
display_name uses "archive.zip/member.ext" format.
"""
import zipfile
results: list[tuple[str, str, str]] = []
with tempfile.TemporaryDirectory(prefix="email_dlp_zip_") as tmpdir:
tmp = Path(tmpdir)
try:
with zipfile.ZipFile(str(filepath), mode="r") as archive:
archive.extractall(path=str(tmp))
except Exception as e:
return [(archive_name, "", f"failed: zip extraction error: {e}")]
for member_path in sorted(tmp.rglob("*")):
if not member_path.is_file():
continue
display_name = f"{archive_name}/{member_path.name}"
text, status = _convert_single_file(member_path)
if not text.startswith(IMAGE_SENTINEL) and len(text) > MAX_TEXT_CHARS:
text = text[:MAX_TEXT_CHARS]
status = f"{status}|truncated_at_{MAX_TEXT_CHARS}"
results.append((display_name, text, status))
return results if results else [(archive_name, "", "skipped")]
def convert_attachment(
filepath: Path, filename: str
) -> list[tuple[str, str, str]]:
"""Convert an attachment file for LLM analysis.
Returns list of (display_name, text_or_sentinel, status).
- Non-archive files: single-element list.
- .7z archives: one element per member file inside the archive.
text_or_sentinel is either plain text or IMAGE_SENTINEL + "<mime>:<base64>"
for image files. Text is truncated to MAX_TEXT_CHARS (images are not truncated).
"""
suffix = Path(filename).suffix.lower()
if suffix == ".7z":
return _convert_7z(filepath, filename)
elif suffix == ".zip":
return _convert_zip(filepath, filename)
text, status = _convert_single_file(filepath)
if not text.startswith(IMAGE_SENTINEL) and len(text) > MAX_TEXT_CHARS:
text = text[:MAX_TEXT_CHARS]
status = f"{status}|truncated_at_{MAX_TEXT_CHARS}"
results = [(filename, text, status)]
# For PDF and Office files, also extract embedded images
if suffix == ".pdf":
results.extend(_extract_pdf_images(filepath, filename))
elif suffix in _OFFICE_MEDIA_DIRS:
results.extend(_extract_office_images(filepath, filename))
return results