from __future__ import annotations import json import os import tempfile from pathlib import Path from threading import Lock from typing import Any from app.models import ChannelSettings, BackendCredential, BackendRecord, OutlookSettings, UserRecord, utcnow_iso class JsonStore: def __init__(self, data_dir: Path): self.data_dir = data_dir self.data_dir.mkdir(parents=True, exist_ok=True) self._lock = Lock() @property def backends_path(self) -> Path: return self.data_dir / "backends.json" @property def credentials_path(self) -> Path: return self.data_dir / "backend_credentials.json" @property def permissions_path(self) -> Path: return self.data_dir / "permissions.json" @property def users_path(self) -> Path: return self.data_dir / "users.json" @property def settings_path(self) -> Path: return self.data_dir / "settings.json" def _read_json(self, path: Path, default: dict[str, Any]) -> dict[str, Any]: if not path.exists(): return default try: return json.loads(path.read_text(encoding="utf-8")) except (OSError, ValueError, json.JSONDecodeError): return default def _write_json(self, path: Path, payload: dict[str, Any]) -> None: with self._lock: path.parent.mkdir(parents=True, exist_ok=True) fd, tmp_name = tempfile.mkstemp(dir=str(path.parent), prefix=path.name, suffix=".tmp") try: with os.fdopen(fd, "w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2, ensure_ascii=False) os.replace(tmp_name, path) finally: if os.path.exists(tmp_name): os.unlink(tmp_name) def list_backends(self) -> list[BackendRecord]: raw = self._read_json(self.backends_path, {"backends": []}) items = raw.get("backends", []) if not isinstance(items, list): return [] return [BackendRecord.model_validate(item) for item in items if isinstance(item, dict)] def get_backend(self, backend_id: str) -> BackendRecord | None: for backend in self.list_backends(): if backend.backend_id == backend_id: return backend return None def save_backend(self, backend: BackendRecord) -> None: backends = [item for item in self.list_backends() if item.backend_id != backend.backend_id] backends.append(backend) backends.sort(key=lambda item: item.backend_id) self._write_json( self.backends_path, {"backends": [item.model_dump(mode="json") for item in backends]}, ) def list_credentials(self) -> list[BackendCredential]: raw = self._read_json(self.credentials_path, {"credentials": []}) items = raw.get("credentials", []) if not isinstance(items, list): return [] return [BackendCredential.model_validate(item) for item in items if isinstance(item, dict)] def get_credential_by_client_id(self, client_id: str) -> BackendCredential | None: for item in self.list_credentials(): if item.client_id == client_id: return item return None def save_credential(self, credential: BackendCredential) -> None: creds = [item for item in self.list_credentials() if item.backend_id != credential.backend_id] creds.append(credential) creds.sort(key=lambda item: item.backend_id) self._write_json( self.credentials_path, {"credentials": [item.model_dump(mode="json") for item in creds]}, ) def list_users(self) -> list[UserRecord]: raw = self._read_json(self.users_path, {"users": []}) items = raw.get("users", []) if not isinstance(items, list): return [] return [UserRecord.model_validate(item) for item in items if isinstance(item, dict)] def get_user(self, username: str) -> UserRecord | None: for item in self.list_users(): if item.username == username: return item return None def save_user(self, user: UserRecord) -> None: users = [item for item in self.list_users() if item.username != user.username] users.append(user) users.sort(key=lambda item: item.username) self._write_json( self.users_path, {"users": [item.model_dump(mode="json") for item in users]}, ) def get_permissions(self, backend_id: str) -> dict[str, Any]: raw = self._read_json(self.permissions_path, {"permissions": {}}) permissions = raw.get("permissions", {}) if not isinstance(permissions, dict): return {} value = permissions.get(backend_id, {}) return value if isinstance(value, dict) else {} def save_permissions(self, backend_id: str, permissions: dict[str, Any]) -> dict[str, Any]: raw = self._read_json(self.permissions_path, {"permissions": {}}) root = raw.get("permissions", {}) if not isinstance(root, dict): root = {} root[backend_id] = permissions self._write_json(self.permissions_path, {"permissions": root}) return permissions def get_outlook_settings(self, backend_id: str) -> OutlookSettings | None: raw = self._read_json(self.settings_path, {"settings": {}}) root = raw.get("settings", {}) if not isinstance(root, dict): return None backend_settings = root.get(backend_id, {}) if not isinstance(backend_settings, dict): return None outlook = backend_settings.get("outlook") if not isinstance(outlook, dict): return None return OutlookSettings.model_validate(outlook) def list_channel_settings(self, backend_id: str) -> dict[str, ChannelSettings]: raw = self._read_json(self.settings_path, {"settings": {}}) root = raw.get("settings", {}) if not isinstance(root, dict): return {} backend_settings = root.get(backend_id, {}) if not isinstance(backend_settings, dict): return {} channels = backend_settings.get("channels", {}) if not isinstance(channels, dict): return {} return { str(channel_id): ChannelSettings.model_validate(settings) for channel_id, settings in channels.items() if isinstance(channel_id, str) and channel_id.strip() and isinstance(settings, dict) } def get_channel_settings(self, backend_id: str, channel_id: str) -> ChannelSettings | None: return self.list_channel_settings(backend_id).get(channel_id) def save_outlook_settings(self, backend_id: str, settings: OutlookSettings) -> dict[str, Any]: raw = self._read_json(self.settings_path, {"settings": {}}) root = raw.get("settings", {}) if not isinstance(root, dict): root = {} backend_settings = root.get(backend_id, {}) if not isinstance(backend_settings, dict): backend_settings = {} backend_settings["outlook"] = settings.model_dump(mode="json") root[backend_id] = backend_settings self._write_json(self.settings_path, {"settings": root}) return backend_settings def save_channel_settings(self, backend_id: str, channel_id: str, settings: ChannelSettings) -> dict[str, Any]: raw = self._read_json(self.settings_path, {"settings": {}}) root = raw.get("settings", {}) if not isinstance(root, dict): root = {} backend_settings = root.get(backend_id, {}) if not isinstance(backend_settings, dict): backend_settings = {} channels = backend_settings.get("channels", {}) if not isinstance(channels, dict): channels = {} channels[channel_id] = settings.model_dump(mode="json") backend_settings["channels"] = channels root[backend_id] = backend_settings self._write_json(self.settings_path, {"settings": root}) return channels[channel_id] def delete_outlook_settings(self, backend_id: str) -> bool: raw = self._read_json(self.settings_path, {"settings": {}}) root = raw.get("settings", {}) if not isinstance(root, dict): return False backend_settings = root.get(backend_id) if not isinstance(backend_settings, dict) or "outlook" not in backend_settings: return False backend_settings.pop("outlook", None) if backend_settings: root[backend_id] = backend_settings else: root.pop(backend_id, None) self._write_json(self.settings_path, {"settings": root}) return True def delete_channel_settings(self, backend_id: str, channel_id: str) -> bool: raw = self._read_json(self.settings_path, {"settings": {}}) root = raw.get("settings", {}) if not isinstance(root, dict): return False backend_settings = root.get(backend_id) if not isinstance(backend_settings, dict): return False channels = backend_settings.get("channels") if not isinstance(channels, dict) or channel_id not in channels: return False channels.pop(channel_id, None) if channels: backend_settings["channels"] = channels else: backend_settings.pop("channels", None) if backend_settings: root[backend_id] = backend_settings else: root.pop(backend_id, None) self._write_json(self.settings_path, {"settings": root}) return True def disable_backend(self, backend_id: str) -> BackendRecord | None: backend = self.get_backend(backend_id) if backend is None: return None backend.status = "disabled" backend.updated_at = utcnow_iso() self.save_backend(backend) return backend def enable_backend(self, backend_id: str) -> BackendRecord | None: backend = self.get_backend(backend_id) if backend is None: return None backend.status = "active" backend.updated_at = utcnow_iso() self.save_backend(backend) return backend def touch_backend(self, backend_id: str) -> BackendRecord | None: backend = self.get_backend(backend_id) if backend is None: return None backend.updated_at = utcnow_iso() self.save_backend(backend) return backend