"""Shared helpers for platform channel adapters.""" from __future__ import annotations from dataclasses import dataclass from typing import Any from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage @dataclass(slots=True) class OutboundTarget: peer_id: str | None thread_id: str | None = None peer_type: str = "unknown" user_id: str | None = None class PlatformDeliveryError(RuntimeError): """Raised when a platform client rejects a delivery.""" def config_bool(config: dict[str, Any], key: str, *, default: bool = False) -> bool: value = config.get(key) if value is None: return default if isinstance(value, bool): return value if isinstance(value, (int, float)): return bool(value) text = str(value).strip().lower() if text in {"1", "true", "yes", "on"}: return True if text in {"0", "false", "no", "off"}: return False return default def config_list(config: dict[str, Any], key: str) -> list[str]: value = config.get(key) if value is None: return [] if isinstance(value, str): return [part.strip() for part in value.split(",") if part.strip()] if isinstance(value, (list, tuple, set)): return [str(item).strip() for item in value if str(item).strip()] text = str(value).strip() return [text] if text else [] def chunk_text(text: str, *, max_chars: int) -> list[str]: if max_chars <= 0: raise ValueError("max_chars must be positive") if not text: return [""] return [text[index : index + max_chars] for index in range(0, len(text), max_chars)] def compact_media_summary(media_type: str, *, file_name: str | None = None) -> str: label = str(media_type or "attachment").strip() or "attachment" if file_name: return f"[{label}: {file_name}]" return f"[{label}]" def target_from_session_id(session_id: str | None) -> OutboundTarget: if not session_id: return OutboundTarget(peer_id=None) parts = str(session_id).split(":") if len(parts) < 3: return OutboundTarget(peer_id=None) thread_id = parts[3] if len(parts) > 3 and parts[3] else None return OutboundTarget(peer_id=parts[2] or None, thread_id=thread_id) def outbound_target(message: OutboundMessage) -> OutboundTarget: identity = message.channel_identity if identity is None: return target_from_session_id(message.session_id) return OutboundTarget( peer_id=identity.peer_id, thread_id=identity.thread_id, peer_type=identity.peer_type, user_id=identity.user_id, ) def mark_unclaimed(message: OutboundMessage) -> None: message.metadata["delivery_status"] = "unclaimed" def build_inbound_message( *, channel_id: str, kind: str, account_id: str, peer_id: str, content: str, message_id: str | None, peer_type: str, user_id: str | None = None, thread_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> InboundMessage: identity = ChannelIdentity( channel_id=channel_id, kind=kind, account_id=account_id, peer_id=peer_id, thread_id=thread_id, peer_type=peer_type, user_id=user_id, message_id=message_id, ) return InboundMessage( channel=channel_id, content=content, session_id=identity.session_id(), user_id=user_id, message_id=message_id or "", channel_identity=identity, metadata=metadata or {}, ) def allowed_by_policy( *, policy: str | None, identifier: str | None, allowlist: list[str], default: str = "open", ) -> bool: effective = (policy or default).strip().lower() if effective == "disabled": return False if effective == "allowlist": return bool(identifier and identifier in allowlist) return True