# Chat Platform Channel Adapters Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use `superpowers:subagent-driven-development` for implementation across the independent adapter tasks, or `superpowers:executing-plans` for inline execution. Track progress with the checkbox steps below. **Goal:** Add first-class Beaver adapters for `telegram`, `feishu`, `qqbot`, and `weixin`, each plugged into `ChannelRuntime`, normalizing inbound platform events into `InboundMessage` with `ChannelIdentity`, and delivering final `OutboundMessage` replies back to the original platform conversation. **Architecture:** Platform adapters are hosted in the backend process. They own transport and platform delivery details. `ChannelRuntime` remains the owner of lifecycle, dedupe, event logging, bus routing, and agent dispatch. ```text platform SDK/API -> {Channel}Adapter -> ChannelRuntime.accept_inbound() -> MessageBus.inbound -> ChannelRuntime bridge -> AgentService.handle_inbound_message() -> MessageBus.outbound -> ChannelManager.dispatch_outbound() -> {Channel}Adapter.send() -> platform SDK/API ``` ## Locked Decisions - Implement these adapters only: `FeishuAdapter`, `QQBotAdapter`, `TelegramAdapter`, `WeixinAdapter`. - Do not add WhatsApp in this phase. - Do not add a sidecar process as the default channel path. - Keep adapter class names platform-only: `FeishuAdapter`, `QQBotAdapter`, `TelegramAdapter`, `WeixinAdapter`. - Keep platform imports lazy, so backend imports and unrelated channels work without every optional package installed. - Adapter constructors must accept injectable clients or factories for unit tests. - Unit tests must not call real platform networks. - Runtime adapter selection is by existing `ChannelConfig.kind` and `ChannelConfig.mode`. - Inbound messages must include `ChannelIdentity`. - Outbound send targets must prefer `message.channel_identity`, then fall back to the normalized `message.session_id`. - Unsupported or incomplete media appears in content and compact metadata instead of being silently dropped. - Streaming token deltas, pairing flows, hot reload, and platform config UI are not included. - Weixin group delivery is best-effort only and is not listed as a stable group capability. ## File Structure - Create: `app-instance/backend/beaver/interfaces/channels/platforms/__init__.py` - Create: `app-instance/backend/beaver/interfaces/channels/platforms/base.py` - Create: `app-instance/backend/beaver/interfaces/channels/platforms/telegram.py` - Create: `app-instance/backend/beaver/interfaces/channels/platforms/feishu.py` - Create: `app-instance/backend/beaver/interfaces/channels/platforms/qqbot.py` - Create: `app-instance/backend/beaver/interfaces/channels/platforms/weixin.py` - Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py` - Modify: `app-instance/backend/pyproject.toml` - Create: `app-instance/backend/tests/unit/test_platform_channel_helpers.py` - Create: `app-instance/backend/tests/unit/test_telegram_channel_adapter.py` - Create: `app-instance/backend/tests/unit/test_feishu_channel_adapter.py` - Create: `app-instance/backend/tests/unit/test_qqbot_channel_adapter.py` - Create: `app-instance/backend/tests/unit/test_weixin_channel_adapter.py` - Modify: `app-instance/backend/tests/unit/test_channel_runtime.py` - Modify: `app-instance/backend/tests/unit/test_imports.py` ## Dependency Strategy Add optional extras for live adapters. Do not put these packages in the base backend dependency list. ```toml [project.optional-dependencies] dev = [ "pytest>=9.0.0,<10.0.0", ] telegram = [ "python-telegram-bot>=22.0,<23.0", ] feishu = [ "lark-oapi>=1.4.22,<2.0.0", ] qqbot = [ "aiohttp>=3.9.0,<4.0.0", ] weixin = [ "aiohttp>=3.9.0,<4.0.0", ] channels = [ "python-telegram-bot>=22.0,<23.0", "lark-oapi>=1.4.22,<2.0.0", "aiohttp>=3.9.0,<4.0.0", ] ``` The plan keeps `httpx` as the default HTTP client where it is already sufficient. Use `aiohttp` only for platform SDK paths that require it. ## Shared Data Rules All normalized identities use: ```text ::[:] ``` Outbound target resolution: ```python def outbound_target(message: OutboundMessage) -> OutboundTarget: if message.channel_identity is not None: identity = message.channel_identity return OutboundTarget( peer_id=identity.peer_id, thread_id=identity.thread_id, peer_type=identity.peer_type, user_id=identity.user_id, ) return target_from_session_id(message.session_id) ``` If neither identity nor session id can produce a peer id, adapter `send()` sets `message.metadata["delivery_status"] = "unclaimed"` and returns without calling the platform client. ## Task 1: Shared Platform Helpers **Files:** - Create `app-instance/backend/beaver/interfaces/channels/platforms/__init__.py` - Create `app-instance/backend/beaver/interfaces/channels/platforms/base.py` - Create `app-instance/backend/tests/unit/test_platform_channel_helpers.py` ### Steps - [ ] Write failing tests in `test_platform_channel_helpers.py`: ```python from beaver.foundation.events import ChannelIdentity, OutboundMessage from beaver.interfaces.channels.platforms.base import ( chunk_text, compact_media_summary, config_bool, config_list, outbound_target, ) def test_config_helpers_normalize_common_values() -> None: assert config_bool({"enabled": "true"}, "enabled", default=False) is True assert config_bool({"enabled": "0"}, "enabled", default=True) is False assert config_list({"allowFrom": "u1,u2"}, "allowFrom") == ["u1", "u2"] assert config_list({"allowFrom": ["u1", 2]}, "allowFrom") == ["u1", "2"] def test_chunk_text_preserves_order_and_limit() -> None: chunks = chunk_text("abcdef", max_chars=2) assert chunks == ["ab", "cd", "ef"] def test_outbound_target_prefers_channel_identity() -> None: identity = ChannelIdentity( channel_id="telegram-main", kind="telegram", account_id="bot-main", peer_id="chat-1", thread_id="topic-1", peer_type="group", user_id="user-1", ) message = OutboundMessage( channel="telegram-main", content="ok", session_id="ignored", finish_reason="stop", channel_identity=identity, ) target = outbound_target(message) assert target.peer_id == "chat-1" assert target.thread_id == "topic-1" assert target.peer_type == "group" assert target.user_id == "user-1" def test_outbound_target_falls_back_to_session_id() -> None: message = OutboundMessage( channel="telegram-main", content="ok", session_id="telegram-main:bot-main:chat-1:topic-1", finish_reason="stop", ) target = outbound_target(message) assert target.peer_id == "chat-1" assert target.thread_id == "topic-1" def test_compact_media_summary_mentions_attachment_type() -> None: assert compact_media_summary("photo", file_name="cat.png") == "[photo: cat.png]" assert compact_media_summary("document") == "[document]" ``` - [ ] Run the focused test and confirm it fails because the module does not exist: ```bash cd app-instance/backend uv run pytest tests/unit/test_platform_channel_helpers.py -q ``` Expected first result: ```text ModuleNotFoundError: No module named 'beaver.interfaces.channels.platforms' ``` - [ ] Implement `platforms/__init__.py`: ```python """Platform channel adapters.""" ``` - [ ] Implement `platforms/base.py`: ```python """Shared helpers for platform channel adapters.""" from __future__ import annotations from dataclasses import dataclass from typing import Any from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage @dataclass(slots=True) class OutboundTarget: peer_id: str | None thread_id: str | None = None peer_type: str = "unknown" user_id: str | None = None class PlatformDeliveryError(RuntimeError): """Raised when a platform client rejects a delivery.""" def config_bool(config: dict[str, Any], key: str, *, default: bool = False) -> bool: value = config.get(key) if value is None: return default if isinstance(value, bool): return value if isinstance(value, (int, float)): return bool(value) text = str(value).strip().lower() if text in {"1", "true", "yes", "on"}: return True if text in {"0", "false", "no", "off"}: return False return default def config_list(config: dict[str, Any], key: str) -> list[str]: value = config.get(key) if value is None: return [] if isinstance(value, str): return [part.strip() for part in value.split(",") if part.strip()] if isinstance(value, (list, tuple, set)): return [str(item).strip() for item in value if str(item).strip()] return [str(value).strip()] if str(value).strip() else [] def chunk_text(text: str, *, max_chars: int) -> list[str]: if max_chars <= 0: raise ValueError("max_chars must be positive") if not text: return [""] return [text[index : index + max_chars] for index in range(0, len(text), max_chars)] def compact_media_summary(media_type: str, *, file_name: str | None = None) -> str: label = str(media_type or "attachment").strip() or "attachment" if file_name: return f"[{label}: {file_name}]" return f"[{label}]" def target_from_session_id(session_id: str | None) -> OutboundTarget: if not session_id: return OutboundTarget(peer_id=None) parts = str(session_id).split(":") if len(parts) < 3: return OutboundTarget(peer_id=None) thread_id = parts[3] if len(parts) > 3 and parts[3] else None return OutboundTarget(peer_id=parts[2] or None, thread_id=thread_id) def outbound_target(message: OutboundMessage) -> OutboundTarget: identity = message.channel_identity if identity is None: return target_from_session_id(message.session_id) return OutboundTarget( peer_id=identity.peer_id, thread_id=identity.thread_id, peer_type=identity.peer_type, user_id=identity.user_id, ) def mark_unclaimed(message: OutboundMessage) -> None: message.metadata["delivery_status"] = "unclaimed" def build_inbound_message( *, channel_id: str, kind: str, account_id: str, peer_id: str, content: str, message_id: str | None, peer_type: str, user_id: str | None = None, thread_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> InboundMessage: identity = ChannelIdentity( channel_id=channel_id, kind=kind, account_id=account_id, peer_id=peer_id, thread_id=thread_id, peer_type=peer_type, user_id=user_id, message_id=message_id, ) return InboundMessage( channel=channel_id, content=content, session_id=identity.session_id(), user_id=user_id, message_id=message_id or "", channel_identity=identity, metadata=metadata or {}, ) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_platform_channel_helpers.py -q ``` Expected result: ```text 5 passed ``` ## Task 2: Runtime Factory and Status **Files:** - Modify `app-instance/backend/beaver/interfaces/channels/runtime.py` - Modify `app-instance/backend/tests/unit/test_channel_runtime.py` Execute this task after the four adapter files exist from Tasks 3 through 6. The factory imports concrete adapter modules, so those modules must be present before the runtime test can pass. ### Steps - [ ] Add failing tests to `test_channel_runtime.py`: ```python def test_channel_runtime_builds_platform_adapters_without_starting_networks(tmp_path) -> None: runtime = ChannelRuntime( service=FakeAgentService(), workspace=tmp_path, channels={}, ) cases = { "telegram-main": ChannelConfig(enabled=True, kind="telegram", mode="polling", account_id="bot-main"), "feishu-main": ChannelConfig(enabled=True, kind="feishu", mode="websocket", account_id="tenant-main"), "qq-main": ChannelConfig(enabled=True, kind="qqbot", mode="websocket", account_id="qq-main"), "weixin-main": ChannelConfig(enabled=True, kind="weixin", mode="polling", account_id="wx-main"), } for channel_id, cfg in cases.items(): adapter = runtime._build_adapter(channel_id, cfg) assert adapter.channel_id == channel_id assert adapter.kind == cfg.kind assert adapter.mode == cfg.mode def test_channel_runtime_reports_platform_capabilities(tmp_path) -> None: runtime = ChannelRuntime( service=FakeAgentService(), workspace=tmp_path, channels={ "telegram-main": ChannelConfig(enabled=True, kind="telegram", mode="polling", account_id="bot-main"), "weixin-main": ChannelConfig(enabled=True, kind="weixin", mode="polling", account_id="wx-main"), }, ) by_id = {item["channel_id"]: item for item in runtime.statuses()} assert by_id["telegram-main"]["capabilities"] == [ "receive_text", "send_text", "receive_media", "groups", ] assert by_id["weixin-main"]["capabilities"] == [ "receive_text", "send_text", "receive_media", "direct_messages", ] ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_channel_runtime.py -q ``` Expected first result: ```text ValueError: Unsupported channel kind/mode ``` - [ ] Add a compact capability helper in `runtime.py`: ```python def _channel_capabilities(kind: str, mode: str) -> list[str]: if kind == "webhook": return ["receive_text", "send_text", "sync_webhook_response"] if kind == "terminal" and mode == "websocket": return ["receive_text", "send_text", "persistent_connection"] if kind in {"feishu", "qqbot", "telegram"}: return ["receive_text", "send_text", "receive_media", "groups"] if kind == "weixin": return ["receive_text", "send_text", "receive_media", "direct_messages"] return [] ``` - [ ] Use `_channel_capabilities(cfg.kind, cfg.mode)` inside `statuses()` while preserving the existing webhook and terminal URL fields. - [ ] Extend `_build_adapter()`: ```python if cfg.kind == "telegram" and cfg.mode in {"polling", "webhook"}: from beaver.interfaces.channels.platforms.telegram import TelegramAdapter return TelegramAdapter( channel_id=channel_id, kind=cfg.kind, mode=cfg.mode, account_id=cfg.account_id, display_name=cfg.display_name, inbound_sink=self, secrets=cfg.secrets, config=cfg.config, event_recorder=self.record_event, ) if cfg.kind == "feishu" and cfg.mode in {"websocket", "webhook"}: from beaver.interfaces.channels.platforms.feishu import FeishuAdapter return FeishuAdapter( channel_id=channel_id, kind=cfg.kind, mode=cfg.mode, account_id=cfg.account_id, display_name=cfg.display_name, inbound_sink=self, secrets=cfg.secrets, config=cfg.config, event_recorder=self.record_event, ) if cfg.kind == "qqbot" and cfg.mode == "websocket": from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter return QQBotAdapter( channel_id=channel_id, kind=cfg.kind, mode=cfg.mode, account_id=cfg.account_id, display_name=cfg.display_name, inbound_sink=self, secrets=cfg.secrets, config=cfg.config, event_recorder=self.record_event, ) if cfg.kind == "weixin" and cfg.mode == "polling": from beaver.interfaces.channels.platforms.weixin import WeixinAdapter return WeixinAdapter( channel_id=channel_id, kind=cfg.kind, mode=cfg.mode, account_id=cfg.account_id, display_name=cfg.display_name, inbound_sink=self, secrets=cfg.secrets, config=cfg.config, event_recorder=self.record_event, ) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_channel_runtime.py -q ``` Expected result includes: ```text passed ``` ## Task 3: TelegramAdapter **Files:** - Create `app-instance/backend/beaver/interfaces/channels/platforms/telegram.py` - Create `app-instance/backend/tests/unit/test_telegram_channel_adapter.py` ### Steps - [ ] Write tests using fake sink and fake client: ```python import asyncio from beaver.foundation.events import OutboundMessage from beaver.interfaces.channels.platforms.telegram import TelegramAdapter class FakeSink: def __init__(self) -> None: self.messages = [] async def accept_inbound(self, message): self.messages.append(message) class FakeTelegramClient: def __init__(self) -> None: self.sent = [] async def send_message(self, **kwargs): self.sent.append(kwargs) def test_telegram_normalizes_private_text_message() -> None: async def run() -> None: sink = FakeSink() adapter = TelegramAdapter( channel_id="telegram-main", kind="telegram", mode="polling", account_id="bot-main", display_name=None, inbound_sink=sink, secrets={"botToken": "x"}, config={}, client=FakeTelegramClient(), ) await adapter.handle_update_payload( { "message": { "message_id": 100, "text": "hello", "chat": {"id": 200, "type": "private"}, "from": {"id": 300, "username": "ivan"}, } } ) message = sink.messages[0] assert message.channel == "telegram-main" assert message.content == "hello" assert message.session_id == "telegram-main:bot-main:200" assert message.channel_identity.peer_type == "dm" assert message.channel_identity.user_id == "300" assert message.channel_identity.message_id == "100" asyncio.run(run()) def test_telegram_group_requires_mention_when_configured() -> None: async def run() -> None: sink = FakeSink() adapter = TelegramAdapter( channel_id="telegram-main", kind="telegram", mode="polling", account_id="bot-main", display_name=None, inbound_sink=sink, secrets={"botToken": "x"}, config={"requireMentionInGroups": True, "botUsername": "beaver_bot"}, client=FakeTelegramClient(), ) await adapter.handle_update_payload( { "message": { "message_id": 101, "text": "hello group", "chat": {"id": -20, "type": "group"}, "from": {"id": 300}, } } ) await adapter.handle_update_payload( { "message": { "message_id": 102, "text": "@beaver_bot hello", "chat": {"id": -20, "type": "group"}, "from": {"id": 300}, } } ) assert len(sink.messages) == 1 assert sink.messages[0].content == "hello" asyncio.run(run()) def test_telegram_sends_chunked_reply_to_identity_target() -> None: async def run() -> None: sink = FakeSink() client = FakeTelegramClient() adapter = TelegramAdapter( channel_id="telegram-main", kind="telegram", mode="polling", account_id="bot-main", display_name=None, inbound_sink=sink, secrets={"botToken": "x"}, config={"maxMessageChars": 3}, client=client, ) await adapter.handle_update_payload( { "message": { "message_id": 100, "text": "hello", "chat": {"id": 200, "type": "private"}, "from": {"id": 300}, } } ) await adapter.send( OutboundMessage( channel="telegram-main", content="abcdef", session_id=sink.messages[0].session_id, finish_reason="stop", channel_identity=sink.messages[0].channel_identity, ) ) assert [item["text"] for item in client.sent] == ["abc", "def"] assert client.sent[0]["chat_id"] == "200" asyncio.run(run()) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_telegram_channel_adapter.py -q ``` Expected first result: ```text ModuleNotFoundError ``` - [ ] Implement `TelegramAdapter` with these methods: ```python class TelegramAdapter: channel_id: str kind: str mode: str def __init__(..., client: Any | None = None, application_factory: Any | None = None) -> None: ... async def start(self) -> None: if self._client is not None: return if self.mode == "polling": self._application = self._build_application() await self._application.initialize() await self._application.start() await self._application.updater.start_polling() self._client = self._application.bot return if self.mode == "webhook": self._client = self._build_bot() return raise ValueError(f"Unsupported telegram mode: {self.mode}") async def stop(self) -> None: if self._application is not None: await self._application.updater.stop() await self._application.stop() await self._application.shutdown() async def handle_update_payload(self, payload: dict[str, Any]) -> None: message = self._normalize_payload(payload) if message is not None: await self.inbound_sink.accept_inbound(message) async def send(self, message: OutboundMessage) -> None: target = outbound_target(message) if not target.peer_id: mark_unclaimed(message) return client = self._require_client() for chunk in chunk_text(message.content, max_chars=self.max_message_chars): await client.send_message(chat_id=target.peer_id, text=chunk) ``` - [ ] Normalization requirements: - Use `message.text` or `message.caption`. - Map private chats to `peer_type="dm"`. - Map group, supergroup, and channel chats to `peer_type="group"` or `peer_type="channel"`. - Use `message.message_thread_id` as `thread_id` when present. - For photo/document/audio/video, append `compact_media_summary()` to content and add compact media metadata. - If `requireMentionInGroups` is true, accept only text containing `@`, then strip that mention from content. - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_telegram_channel_adapter.py -q ``` Expected result: ```text 3 passed ``` ## Task 4: FeishuAdapter **Files:** - Create `app-instance/backend/beaver/interfaces/channels/platforms/feishu.py` - Create `app-instance/backend/tests/unit/test_feishu_channel_adapter.py` ### Steps - [ ] Write tests: ```python import asyncio from beaver.foundation.events import OutboundMessage from beaver.interfaces.channels.platforms.feishu import FeishuAdapter class FakeSink: def __init__(self) -> None: self.messages = [] async def accept_inbound(self, message): self.messages.append(message) class FakeFeishuClient: def __init__(self) -> None: self.sent = [] async def send_text(self, *, receive_id_type: str, receive_id: str, text: str): self.sent.append( {"receive_id_type": receive_id_type, "receive_id": receive_id, "text": text} ) def test_feishu_normalizes_direct_text_event() -> None: async def run() -> None: sink = FakeSink() adapter = FeishuAdapter( channel_id="feishu-main", kind="feishu", mode="websocket", account_id="tenant-main", display_name=None, inbound_sink=sink, secrets={"appId": "app", "appSecret": "secret"}, config={}, client=FakeFeishuClient(), ) await adapter.handle_event_payload( { "event": { "message": { "message_id": "m1", "chat_id": "oc_chat", "chat_type": "p2p", "message_type": "text", "content": "{\"text\":\"hello\"}", }, "sender": {"sender_id": {"open_id": "ou_user"}}, } } ) message = sink.messages[0] assert message.content == "hello" assert message.session_id == "feishu-main:tenant-main:oc_chat" assert message.channel_identity.peer_type == "dm" assert message.channel_identity.user_id == "ou_user" asyncio.run(run()) def test_feishu_group_mention_gate() -> None: async def run() -> None: sink = FakeSink() adapter = FeishuAdapter( channel_id="feishu-main", kind="feishu", mode="websocket", account_id="tenant-main", display_name=None, inbound_sink=sink, secrets={"appId": "app", "appSecret": "secret"}, config={"requireMentionInGroups": True, "botOpenId": "ou_bot"}, client=FakeFeishuClient(), ) await adapter.handle_event_payload( { "event": { "message": { "message_id": "m1", "chat_id": "oc_group", "chat_type": "group", "message_type": "text", "content": "{\"text\":\"hello\"}", "mentions": [], }, "sender": {"sender_id": {"open_id": "ou_user"}}, } } ) await adapter.handle_event_payload( { "event": { "message": { "message_id": "m2", "chat_id": "oc_group", "chat_type": "group", "message_type": "text", "content": "{\"text\":\"hello\"}", "mentions": [{"id": {"open_id": "ou_bot"}}], }, "sender": {"sender_id": {"open_id": "ou_user"}}, } } ) assert len(sink.messages) == 1 asyncio.run(run()) def test_feishu_sends_text_to_chat_id() -> None: async def run() -> None: sink = FakeSink() client = FakeFeishuClient() adapter = FeishuAdapter( channel_id="feishu-main", kind="feishu", mode="websocket", account_id="tenant-main", display_name=None, inbound_sink=sink, secrets={"appId": "app", "appSecret": "secret"}, config={}, client=client, ) await adapter.handle_event_payload( { "event": { "message": { "message_id": "m1", "chat_id": "oc_chat", "chat_type": "p2p", "message_type": "text", "content": "{\"text\":\"hello\"}", }, "sender": {"sender_id": {"open_id": "ou_user"}}, } } ) await adapter.send( OutboundMessage( channel="feishu-main", content="ok", session_id=sink.messages[0].session_id, finish_reason="stop", channel_identity=sink.messages[0].channel_identity, ) ) assert client.sent == [ {"receive_id_type": "chat_id", "receive_id": "oc_chat", "text": "ok"} ] asyncio.run(run()) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_feishu_channel_adapter.py -q ``` Expected first result: ```text ModuleNotFoundError ``` - [ ] Implement `FeishuAdapter`: - Parse `event.message.content` as JSON when possible. - `message_type="text"` reads `content["text"]`. - Non-text message types use `compact_media_summary(message_type, file_name=...)`. - `chat_type="p2p"` maps to `peer_type="dm"`. - Any other chat type maps to `peer_type="group"`. - `sender.sender_id.open_id` becomes `user_id`. - Mention gate checks `message.mentions[*].id.open_id` against `config.botOpenId`. - `send()` calls `client.send_text(receive_id_type="chat_id", receive_id=target.peer_id, text=chunk)`. - Live client creation is isolated in `_build_client()` and imports `lark_oapi` only inside that method. - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_feishu_channel_adapter.py -q ``` Expected result: ```text 3 passed ``` ## Task 5: QQBotAdapter **Files:** - Create `app-instance/backend/beaver/interfaces/channels/platforms/qqbot.py` - Create `app-instance/backend/tests/unit/test_qqbot_channel_adapter.py` ### Steps - [ ] Write tests: ```python import asyncio from beaver.foundation.events import OutboundMessage from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter class FakeSink: def __init__(self) -> None: self.messages = [] async def accept_inbound(self, message): self.messages.append(message) class FakeQQBotClient: def __init__(self) -> None: self.sent = [] async def send_text(self, *, peer_type: str, peer_id: str, content: str, message_id: str | None): self.sent.append( { "peer_type": peer_type, "peer_id": peer_id, "content": content, "message_id": message_id, } ) def test_qqbot_normalizes_private_c2c_message() -> None: async def run() -> None: sink = FakeSink() adapter = QQBotAdapter( channel_id="qq-main", kind="qqbot", mode="websocket", account_id="qq-bot", display_name=None, inbound_sink=sink, secrets={"appId": "app", "clientSecret": "secret"}, config={}, client=FakeQQBotClient(), ) await adapter.handle_event_payload( { "t": "C2C_MESSAGE_CREATE", "d": { "id": "m1", "author": {"user_openid": "u1"}, "content": "hello", }, } ) message = sink.messages[0] assert message.content == "hello" assert message.session_id == "qq-main:qq-bot:u1" assert message.channel_identity.peer_type == "dm" assert message.channel_identity.user_id == "u1" asyncio.run(run()) def test_qqbot_normalizes_group_message() -> None: async def run() -> None: sink = FakeSink() adapter = QQBotAdapter( channel_id="qq-main", kind="qqbot", mode="websocket", account_id="qq-bot", display_name=None, inbound_sink=sink, secrets={"appId": "app", "clientSecret": "secret"}, config={}, client=FakeQQBotClient(), ) await adapter.handle_event_payload( { "t": "GROUP_AT_MESSAGE_CREATE", "d": { "id": "m2", "group_openid": "g1", "author": {"member_openid": "u1"}, "content": "hello group", }, } ) message = sink.messages[0] assert message.session_id == "qq-main:qq-bot:g1" assert message.channel_identity.peer_type == "group" assert message.channel_identity.user_id == "u1" asyncio.run(run()) def test_qqbot_sends_reply_with_original_message_id() -> None: async def run() -> None: sink = FakeSink() client = FakeQQBotClient() adapter = QQBotAdapter( channel_id="qq-main", kind="qqbot", mode="websocket", account_id="qq-bot", display_name=None, inbound_sink=sink, secrets={"appId": "app", "clientSecret": "secret"}, config={}, client=client, ) await adapter.handle_event_payload( { "t": "GROUP_AT_MESSAGE_CREATE", "d": { "id": "m2", "group_openid": "g1", "author": {"member_openid": "u1"}, "content": "hello group", }, } ) await adapter.send( OutboundMessage( channel="qq-main", content="ok", session_id=sink.messages[0].session_id, finish_reason="stop", channel_identity=sink.messages[0].channel_identity, ) ) assert client.sent[0] == { "peer_type": "group", "peer_id": "g1", "content": "ok", "message_id": "m2", } asyncio.run(run()) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_qqbot_channel_adapter.py -q ``` Expected first result: ```text ModuleNotFoundError ``` - [ ] Implement `QQBotAdapter`: - `C2C_MESSAGE_CREATE`: `peer_id=author.user_openid`, `peer_type="dm"`. - `GROUP_AT_MESSAGE_CREATE`: `peer_id=group_openid`, `peer_type="group"`, `user_id=author.member_openid`. - Guild/channel events with `guild_id` and `channel_id`: `peer_id=channel_id`, `thread_id=guild_id`, `peer_type="channel"`. - Attachment arrays become compact media summaries and metadata entries. - Apply `dmPolicy`, `groupPolicy`, `allowFrom`, and `groupAllowFrom` before accepting. - `send()` calls the injected client `send_text()`. - Live websocket and REST client setup live behind methods that import optional packages only on start. - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_qqbot_channel_adapter.py -q ``` Expected result: ```text 3 passed ``` ## Task 6: WeixinAdapter **Files:** - Create `app-instance/backend/beaver/interfaces/channels/platforms/weixin.py` - Create `app-instance/backend/tests/unit/test_weixin_channel_adapter.py` ### Steps - [ ] Write tests: ```python import asyncio from beaver.foundation.events import OutboundMessage from beaver.interfaces.channels.platforms.weixin import WeixinAdapter class FakeSink: def __init__(self) -> None: self.messages = [] async def accept_inbound(self, message): self.messages.append(message) class FakeWeixinClient: def __init__(self) -> None: self.sent = [] async def send_text(self, *, peer_id: str, text: str, context_token: str | None): self.sent.append({"peer_id": peer_id, "text": text, "context_token": context_token}) def test_weixin_normalizes_direct_text_message() -> None: async def run() -> None: sink = FakeSink() adapter = WeixinAdapter( channel_id="weixin-main", kind="weixin", mode="polling", account_id="wx-main", display_name=None, inbound_sink=sink, secrets={"token": "token"}, config={}, client=FakeWeixinClient(), ) await adapter.handle_message_payload( { "id": "m1", "from": "wx_user", "room_id": "", "type": "text", "text": "hello", "context_token": "ctx1", } ) message = sink.messages[0] assert message.content == "hello" assert message.session_id == "weixin-main:wx-main:wx_user" assert message.channel_identity.peer_type == "dm" assert message.metadata["context_token"] == "ctx1" asyncio.run(run()) def test_weixin_group_message_is_best_effort() -> None: async def run() -> None: sink = FakeSink() adapter = WeixinAdapter( channel_id="weixin-main", kind="weixin", mode="polling", account_id="wx-main", display_name=None, inbound_sink=sink, secrets={"token": "token"}, config={"groupPolicy": "open"}, client=FakeWeixinClient(), ) await adapter.handle_message_payload( { "id": "m2", "from": "wx_user", "room_id": "room1", "type": "text", "text": "hello room", "context_token": "ctx2", } ) message = sink.messages[0] assert message.session_id == "weixin-main:wx-main:room1" assert message.channel_identity.peer_type == "group" assert message.channel_identity.user_id == "wx_user" asyncio.run(run()) def test_weixin_sends_text_with_context_token() -> None: async def run() -> None: sink = FakeSink() client = FakeWeixinClient() adapter = WeixinAdapter( channel_id="weixin-main", kind="weixin", mode="polling", account_id="wx-main", display_name=None, inbound_sink=sink, secrets={"token": "token"}, config={}, client=client, ) await adapter.handle_message_payload( { "id": "m1", "from": "wx_user", "type": "text", "text": "hello", "context_token": "ctx1", } ) await adapter.send( OutboundMessage( channel="weixin-main", content="ok", session_id=sink.messages[0].session_id, finish_reason="stop", channel_identity=sink.messages[0].channel_identity, metadata={"inbound_metadata": sink.messages[0].metadata}, ) ) assert client.sent == [{"peer_id": "wx_user", "text": "ok", "context_token": "ctx1"}] asyncio.run(run()) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_weixin_channel_adapter.py -q ``` Expected first result: ```text ModuleNotFoundError ``` - [ ] Implement `WeixinAdapter`: - Direct messages use `peer_id=from`, `peer_type="dm"`. - Group messages use `peer_id=room_id`, `peer_type="group"`, `user_id=from`. - Default `dmPolicy` is `open`. - Default `groupPolicy` is `disabled`. - Preserve `context_token` in inbound metadata. - `send()` reads `context_token` from `message.metadata["inbound_metadata"]` when available. - Live polling setup uses configured `baseUrl`, `cdnBaseUrl`, and `token`. - QR credential loading is represented as explicit config/state read inside the live client builder. - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_weixin_channel_adapter.py -q ``` Expected result: ```text 3 passed ``` ## Task 7: Import Guards and Optional Extras **Files:** - Modify `app-instance/backend/pyproject.toml` - Modify `app-instance/backend/tests/unit/test_imports.py` ### Steps - [ ] Update `pyproject.toml` optional extras exactly as described in Dependency Strategy. - [ ] Add import tests: ```python def test_platform_channel_modules_import_without_live_clients() -> None: from beaver.interfaces.channels.platforms.feishu import FeishuAdapter from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter from beaver.interfaces.channels.platforms.telegram import TelegramAdapter from beaver.interfaces.channels.platforms.weixin import WeixinAdapter assert FeishuAdapter.KIND == "feishu" assert QQBotAdapter.KIND == "qqbot" assert TelegramAdapter.KIND == "telegram" assert WeixinAdapter.KIND == "weixin" ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_imports.py -q ``` Expected result: ```text passed ``` ## Task 8: Runtime Startup Isolation **Files:** - Modify `app-instance/backend/tests/unit/test_channel_runtime.py` - Modify adapter files as needed ### Steps - [ ] Add a startup isolation test: ```python def test_channel_runtime_platform_start_failure_does_not_stop_other_channels(tmp_path) -> None: async def run() -> None: runtime = ChannelRuntime( service=FakeAgentService(), workspace=tmp_path, channels={ "telegram-main": ChannelConfig( enabled=True, kind="telegram", mode="polling", account_id="bot-main", secrets={}, ), "off": ChannelConfig( enabled=False, kind="weixin", mode="polling", account_id="wx-main", ), }, ) await runtime.start() try: by_id = {item["channel_id"]: item for item in runtime.statuses()} finally: await runtime.stop() assert by_id["telegram-main"]["state"] == "error" assert "botToken" in by_id["telegram-main"]["last_error"] assert by_id["off"]["state"] == "disabled" asyncio.run(run()) ``` - [ ] Ensure each live adapter builder validates required secrets only when `start()` needs a live client: ```python def _require_secret(self, key: str) -> str: value = self.secrets.get(key) if not value: raise ValueError(f"{key} is required") return str(value) ``` - [ ] Run: ```bash cd app-instance/backend uv run pytest tests/unit/test_channel_runtime.py -q ``` Expected result: ```text passed ``` ## Task 9: Cross-Channel Verification **Files:** no new files unless a previous task reveals a test gap. ### Steps - [ ] Run all focused channel tests: ```bash cd app-instance/backend uv run pytest \ tests/unit/test_platform_channel_helpers.py \ tests/unit/test_telegram_channel_adapter.py \ tests/unit/test_feishu_channel_adapter.py \ tests/unit/test_qqbot_channel_adapter.py \ tests/unit/test_weixin_channel_adapter.py \ tests/unit/test_channel_runtime.py \ tests/unit/test_imports.py \ -q ``` Expected result: ```text passed ``` - [ ] Run the broader backend unit suite: ```bash cd app-instance/backend uv run pytest tests/unit -q ``` Expected result: ```text passed ``` - [ ] Inspect changed files: ```bash git status --short git diff -- app-instance/backend/beaver/interfaces/channels app-instance/backend/tests/unit app-instance/backend/pyproject.toml ``` - [ ] Confirm these invariants manually from the diff: - `ChannelRuntime` still owns `MessageBus` and agent dispatch. - Platform adapters call only `ChannelInboundSink.accept_inbound()` for inbound routing. - Platform adapters do not call `AgentService`. - Optional platform packages are imported only inside live client or live transport builders. - Tests use fake clients and fake payloads. - No adapter sends when the outbound target cannot be resolved; it marks `delivery_status="unclaimed"` instead. ## Implementation Order 1. Task 1: Shared Platform Helpers 2. Task 3: TelegramAdapter 3. Task 4: FeishuAdapter 4. Task 5: QQBotAdapter 5. Task 6: WeixinAdapter 6. Task 2: Runtime Factory and Status 7. Task 7: Import Guards and Optional Extras 8. Task 8: Runtime Startup Isolation 9. Task 9: Cross-Channel Verification This order proves the shared adapter pattern with Telegram first, then adds the two official bot-style channels, leaves Weixin adapter behavior last because context tokens and login state make it the most specialized, and only then connects the concrete modules to the runtime factory.