"""Attachment → markdown text conversion routing.""" import base64 import tempfile import zipfile from pathlib import Path from markitdown import MarkItDown MAX_TEXT_CHARS = 20_000 # Sentinel prefix used to pass image data through the (text, status) interface. # Format: IMAGE_SENTINEL + ":" IMAGE_SENTINEL = "__IMAGE__:" _IMAGE_MIME = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".bmp": "image/bmp", ".tiff": "image/tiff", ".webp": "image/webp", ".img": "image/png", # fallback for generated inline names } def _convert_single_file(filepath: Path) -> tuple[str, str]: """Convert a single file to text. Returns (text, status). For image files, text is IMAGE_SENTINEL + ":" and status is "ok:image". Callers must check for the sentinel. """ suffix = filepath.suffix.lower() # Image — return base64 sentinel for VLM consumption if suffix in _IMAGE_MIME: mime = _IMAGE_MIME[suffix] b64 = base64.b64encode(filepath.read_bytes()).decode() return IMAGE_SENTINEL + f"{mime}:{b64}", "ok:image" known_binary_exts = { ".py", ".js", ".ts", ".java", ".c", ".cpp", ".h", ".cs", ".go", ".rb", ".rs", ".sh", ".txt", ".md", ".sql", ".yaml", ".yml", ".json", ".xml", ".html", ".htm", ".css", } if suffix in known_binary_exts: # Plain text fallback — read directly try: text = filepath.read_text(errors="replace") return text, "ok" except Exception as e: return "", f"failed: {e}" # Use markitdown for PDF, DOCX, XLSX, CSV, etc. try: md = MarkItDown() result = md.convert(str(filepath)) return result.text_content or "", "ok" except Exception as e: # Fallback to plain-text read for unknown types try: text = filepath.read_text(errors="replace") return text, f"fallback: {e}" except Exception as e2: return "", f"failed: {e2}" _OFFICE_MEDIA_DIRS = { ".docx": "word/media/", ".pptx": "ppt/media/", ".xlsx": "xl/media/", } _IMAGE_EXTS = set(_IMAGE_MIME.keys()) def _extract_pdf_images( filepath: Path, filename: str ) -> list[tuple[str, str, str]]: """Extract embedded images from a PDF using PyMuPDF. Returns list of (display_name, IMAGE_SENTINEL+..., "ok:image"). Returns empty list if fitz is not installed or no images found. """ try: import fitz # PyMuPDF except ImportError: return [] results: list[tuple[str, str, str]] = [] try: doc = fitz.open(str(filepath)) img_index = 0 for page in doc: for img in page.get_images(): xref = img[0] img_data = doc.extract_image(xref) ext = img_data.get("ext", "png") mime = _IMAGE_MIME.get(f".{ext}", f"image/{ext}") b64 = base64.b64encode(img_data["image"]).decode() display_name = f"{filename}/image_{img_index}.{ext}" results.append((display_name, IMAGE_SENTINEL + f"{mime}:{b64}", "ok:image")) img_index += 1 except Exception: pass return results def _extract_office_images( filepath: Path, filename: str ) -> list[tuple[str, str, str]]: """Extract embedded images from a DOCX/PPTX/XLSX using zipfile. Returns list of (display_name, IMAGE_SENTINEL+..., "ok:image"). Returns empty list if the file is not a valid ZIP or has no images. """ suffix = Path(filename).suffix.lower() media_dir = _OFFICE_MEDIA_DIRS.get(suffix) if not media_dir: return [] results: list[tuple[str, str, str]] = [] try: with zipfile.ZipFile(str(filepath), "r") as zf: for name in sorted(zf.namelist()): if not name.startswith(media_dir): continue member_suffix = Path(name).suffix.lower() if member_suffix not in _IMAGE_EXTS: continue mime = _IMAGE_MIME[member_suffix] b64 = base64.b64encode(zf.read(name)).decode() display_name = f"{filename}/{Path(name).name}" results.append((display_name, IMAGE_SENTINEL + f"{mime}:{b64}", "ok:image")) except Exception: pass return results def _convert_7z( filepath: Path, archive_name: str ) -> list[tuple[str, str, str]]: """Extract a .7z archive and convert each member. Returns list of (display_name, text_or_sentinel, status), one entry per member. display_name uses "archive.7z/member.ext" format. """ try: import py7zr except ImportError: return [(archive_name, "", "failed: py7zr not installed")] results: list[tuple[str, str, str]] = [] with tempfile.TemporaryDirectory(prefix="email_dlp_7z_") as tmpdir: tmp = Path(tmpdir) try: with py7zr.SevenZipFile(str(filepath), mode="r") as archive: archive.extractall(path=str(tmp)) except Exception as e: return [(archive_name, "", f"failed: 7z extraction error: {e}")] for member_path in sorted(tmp.rglob("*")): if not member_path.is_file(): continue display_name = f"{archive_name}/{member_path.name}" text, status = _convert_single_file(member_path) if not text.startswith(IMAGE_SENTINEL) and len(text) > MAX_TEXT_CHARS: text = text[:MAX_TEXT_CHARS] status = f"{status}|truncated_at_{MAX_TEXT_CHARS}" results.append((display_name, text, status)) return results if results else [(archive_name, "", "skipped")] def _convert_zip( filepath: Path, archive_name: str ) -> list[tuple[str, str, str]]: """Extract a .zip archive and convert each member. Returns list of (display_name, text_or_sentinel, status), one entry per member. display_name uses "archive.zip/member.ext" format. """ import zipfile results: list[tuple[str, str, str]] = [] with tempfile.TemporaryDirectory(prefix="email_dlp_zip_") as tmpdir: tmp = Path(tmpdir) try: with zipfile.ZipFile(str(filepath), mode="r") as archive: archive.extractall(path=str(tmp)) except Exception as e: return [(archive_name, "", f"failed: zip extraction error: {e}")] for member_path in sorted(tmp.rglob("*")): if not member_path.is_file(): continue display_name = f"{archive_name}/{member_path.name}" text, status = _convert_single_file(member_path) if not text.startswith(IMAGE_SENTINEL) and len(text) > MAX_TEXT_CHARS: text = text[:MAX_TEXT_CHARS] status = f"{status}|truncated_at_{MAX_TEXT_CHARS}" results.append((display_name, text, status)) return results if results else [(archive_name, "", "skipped")] def convert_attachment( filepath: Path, filename: str ) -> list[tuple[str, str, str]]: """Convert an attachment file for LLM analysis. Returns list of (display_name, text_or_sentinel, status). - Non-archive files: single-element list. - .7z archives: one element per member file inside the archive. text_or_sentinel is either plain text or IMAGE_SENTINEL + ":" for image files. Text is truncated to MAX_TEXT_CHARS (images are not truncated). """ suffix = Path(filename).suffix.lower() if suffix == ".7z": return _convert_7z(filepath, filename) elif suffix == ".zip": return _convert_zip(filepath, filename) text, status = _convert_single_file(filepath) if not text.startswith(IMAGE_SENTINEL) and len(text) > MAX_TEXT_CHARS: text = text[:MAX_TEXT_CHARS] status = f"{status}|truncated_at_{MAX_TEXT_CHARS}" results = [(filename, text, status)] # For PDF and Office files, also extract embedded images if suffix == ".pdf": results.extend(_extract_pdf_images(filepath, filename)) elif suffix in _OFFICE_MEDIA_DIRS: results.extend(_extract_office_images(filepath, filename)) return results