Files
beaver_project/docs/superpowers/plans/2026-06-02-chat-platform-channel-adapters.md

44 KiB

Chat Platform Channel Adapters Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development for implementation across the independent adapter tasks, or superpowers:executing-plans for inline execution. Track progress with the checkbox steps below.

Goal: Add first-class Beaver adapters for telegram, feishu, qqbot, and weixin, each plugged into ChannelRuntime, normalizing inbound platform events into InboundMessage with ChannelIdentity, and delivering final OutboundMessage replies back to the original platform conversation.

Architecture: Platform adapters are hosted in the backend process. They own transport and platform delivery details. ChannelRuntime remains the owner of lifecycle, dedupe, event logging, bus routing, and agent dispatch.

platform SDK/API
-> {Channel}Adapter
-> ChannelRuntime.accept_inbound()
-> MessageBus.inbound
-> ChannelRuntime bridge
-> AgentService.handle_inbound_message()
-> MessageBus.outbound
-> ChannelManager.dispatch_outbound()
-> {Channel}Adapter.send()
-> platform SDK/API

Locked Decisions

  • Implement these adapters only: FeishuAdapter, QQBotAdapter, TelegramAdapter, WeixinAdapter.
  • Do not add WhatsApp in this phase.
  • Do not add a sidecar process as the default channel path.
  • Keep adapter class names platform-only: FeishuAdapter, QQBotAdapter, TelegramAdapter, WeixinAdapter.
  • Keep platform imports lazy, so backend imports and unrelated channels work without every optional package installed.
  • Adapter constructors must accept injectable clients or factories for unit tests.
  • Unit tests must not call real platform networks.
  • Runtime adapter selection is by existing ChannelConfig.kind and ChannelConfig.mode.
  • Inbound messages must include ChannelIdentity.
  • Outbound send targets must prefer message.channel_identity, then fall back to the normalized message.session_id.
  • Unsupported or incomplete media appears in content and compact metadata instead of being silently dropped.
  • Streaming token deltas, pairing flows, hot reload, and platform config UI are not included.
  • Weixin group delivery is best-effort only and is not listed as a stable group capability.

File Structure

  • Create: app-instance/backend/beaver/interfaces/channels/platforms/__init__.py
  • Create: app-instance/backend/beaver/interfaces/channels/platforms/base.py
  • Create: app-instance/backend/beaver/interfaces/channels/platforms/telegram.py
  • Create: app-instance/backend/beaver/interfaces/channels/platforms/feishu.py
  • Create: app-instance/backend/beaver/interfaces/channels/platforms/qqbot.py
  • Create: app-instance/backend/beaver/interfaces/channels/platforms/weixin.py
  • Modify: app-instance/backend/beaver/interfaces/channels/runtime.py
  • Modify: app-instance/backend/pyproject.toml
  • Create: app-instance/backend/tests/unit/test_platform_channel_helpers.py
  • Create: app-instance/backend/tests/unit/test_telegram_channel_adapter.py
  • Create: app-instance/backend/tests/unit/test_feishu_channel_adapter.py
  • Create: app-instance/backend/tests/unit/test_qqbot_channel_adapter.py
  • Create: app-instance/backend/tests/unit/test_weixin_channel_adapter.py
  • Modify: app-instance/backend/tests/unit/test_channel_runtime.py
  • Modify: app-instance/backend/tests/unit/test_imports.py

Dependency Strategy

Add optional extras for live adapters. Do not put these packages in the base backend dependency list.

[project.optional-dependencies]
dev = [
    "pytest>=9.0.0,<10.0.0",
]
telegram = [
    "python-telegram-bot>=22.0,<23.0",
]
feishu = [
    "lark-oapi>=1.4.22,<2.0.0",
]
qqbot = [
    "aiohttp>=3.9.0,<4.0.0",
]
weixin = [
    "aiohttp>=3.9.0,<4.0.0",
]
channels = [
    "python-telegram-bot>=22.0,<23.0",
    "lark-oapi>=1.4.22,<2.0.0",
    "aiohttp>=3.9.0,<4.0.0",
]

The plan keeps httpx as the default HTTP client where it is already sufficient. Use aiohttp only for platform SDK paths that require it.

Shared Data Rules

All normalized identities use:

<channel_id>:<account_id>:<peer_id>[:<thread_id>]

Outbound target resolution:

def outbound_target(message: OutboundMessage) -> OutboundTarget:
    if message.channel_identity is not None:
        identity = message.channel_identity
        return OutboundTarget(
            peer_id=identity.peer_id,
            thread_id=identity.thread_id,
            peer_type=identity.peer_type,
            user_id=identity.user_id,
        )
    return target_from_session_id(message.session_id)

If neither identity nor session id can produce a peer id, adapter send() sets message.metadata["delivery_status"] = "unclaimed" and returns without calling the platform client.

Task 1: Shared Platform Helpers

Files:

  • Create app-instance/backend/beaver/interfaces/channels/platforms/__init__.py
  • Create app-instance/backend/beaver/interfaces/channels/platforms/base.py
  • Create app-instance/backend/tests/unit/test_platform_channel_helpers.py

Steps

  • Write failing tests in test_platform_channel_helpers.py:
from beaver.foundation.events import ChannelIdentity, OutboundMessage
from beaver.interfaces.channels.platforms.base import (
    chunk_text,
    compact_media_summary,
    config_bool,
    config_list,
    outbound_target,
)


def test_config_helpers_normalize_common_values() -> None:
    assert config_bool({"enabled": "true"}, "enabled", default=False) is True
    assert config_bool({"enabled": "0"}, "enabled", default=True) is False
    assert config_list({"allowFrom": "u1,u2"}, "allowFrom") == ["u1", "u2"]
    assert config_list({"allowFrom": ["u1", 2]}, "allowFrom") == ["u1", "2"]


def test_chunk_text_preserves_order_and_limit() -> None:
    chunks = chunk_text("abcdef", max_chars=2)
    assert chunks == ["ab", "cd", "ef"]


def test_outbound_target_prefers_channel_identity() -> None:
    identity = ChannelIdentity(
        channel_id="telegram-main",
        kind="telegram",
        account_id="bot-main",
        peer_id="chat-1",
        thread_id="topic-1",
        peer_type="group",
        user_id="user-1",
    )
    message = OutboundMessage(
        channel="telegram-main",
        content="ok",
        session_id="ignored",
        finish_reason="stop",
        channel_identity=identity,
    )

    target = outbound_target(message)

    assert target.peer_id == "chat-1"
    assert target.thread_id == "topic-1"
    assert target.peer_type == "group"
    assert target.user_id == "user-1"


def test_outbound_target_falls_back_to_session_id() -> None:
    message = OutboundMessage(
        channel="telegram-main",
        content="ok",
        session_id="telegram-main:bot-main:chat-1:topic-1",
        finish_reason="stop",
    )

    target = outbound_target(message)

    assert target.peer_id == "chat-1"
    assert target.thread_id == "topic-1"


def test_compact_media_summary_mentions_attachment_type() -> None:
    assert compact_media_summary("photo", file_name="cat.png") == "[photo: cat.png]"
    assert compact_media_summary("document") == "[document]"
  • Run the focused test and confirm it fails because the module does not exist:
cd app-instance/backend
uv run pytest tests/unit/test_platform_channel_helpers.py -q

Expected first result:

ModuleNotFoundError: No module named 'beaver.interfaces.channels.platforms'
  • Implement platforms/__init__.py:
"""Platform channel adapters."""
  • Implement platforms/base.py:
"""Shared helpers for platform channel adapters."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage


@dataclass(slots=True)
class OutboundTarget:
    peer_id: str | None
    thread_id: str | None = None
    peer_type: str = "unknown"
    user_id: str | None = None


class PlatformDeliveryError(RuntimeError):
    """Raised when a platform client rejects a delivery."""


def config_bool(config: dict[str, Any], key: str, *, default: bool = False) -> bool:
    value = config.get(key)
    if value is None:
        return default
    if isinstance(value, bool):
        return value
    if isinstance(value, (int, float)):
        return bool(value)
    text = str(value).strip().lower()
    if text in {"1", "true", "yes", "on"}:
        return True
    if text in {"0", "false", "no", "off"}:
        return False
    return default


def config_list(config: dict[str, Any], key: str) -> list[str]:
    value = config.get(key)
    if value is None:
        return []
    if isinstance(value, str):
        return [part.strip() for part in value.split(",") if part.strip()]
    if isinstance(value, (list, tuple, set)):
        return [str(item).strip() for item in value if str(item).strip()]
    return [str(value).strip()] if str(value).strip() else []


def chunk_text(text: str, *, max_chars: int) -> list[str]:
    if max_chars <= 0:
        raise ValueError("max_chars must be positive")
    if not text:
        return [""]
    return [text[index : index + max_chars] for index in range(0, len(text), max_chars)]


def compact_media_summary(media_type: str, *, file_name: str | None = None) -> str:
    label = str(media_type or "attachment").strip() or "attachment"
    if file_name:
        return f"[{label}: {file_name}]"
    return f"[{label}]"


def target_from_session_id(session_id: str | None) -> OutboundTarget:
    if not session_id:
        return OutboundTarget(peer_id=None)
    parts = str(session_id).split(":")
    if len(parts) < 3:
        return OutboundTarget(peer_id=None)
    thread_id = parts[3] if len(parts) > 3 and parts[3] else None
    return OutboundTarget(peer_id=parts[2] or None, thread_id=thread_id)


def outbound_target(message: OutboundMessage) -> OutboundTarget:
    identity = message.channel_identity
    if identity is None:
        return target_from_session_id(message.session_id)
    return OutboundTarget(
        peer_id=identity.peer_id,
        thread_id=identity.thread_id,
        peer_type=identity.peer_type,
        user_id=identity.user_id,
    )


def mark_unclaimed(message: OutboundMessage) -> None:
    message.metadata["delivery_status"] = "unclaimed"


def build_inbound_message(
    *,
    channel_id: str,
    kind: str,
    account_id: str,
    peer_id: str,
    content: str,
    message_id: str | None,
    peer_type: str,
    user_id: str | None = None,
    thread_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> InboundMessage:
    identity = ChannelIdentity(
        channel_id=channel_id,
        kind=kind,
        account_id=account_id,
        peer_id=peer_id,
        thread_id=thread_id,
        peer_type=peer_type,
        user_id=user_id,
        message_id=message_id,
    )
    return InboundMessage(
        channel=channel_id,
        content=content,
        session_id=identity.session_id(),
        user_id=user_id,
        message_id=message_id or "",
        channel_identity=identity,
        metadata=metadata or {},
    )
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_platform_channel_helpers.py -q

Expected result:

5 passed

Task 2: Runtime Factory and Status

Files:

  • Modify app-instance/backend/beaver/interfaces/channels/runtime.py
  • Modify app-instance/backend/tests/unit/test_channel_runtime.py

Execute this task after the four adapter files exist from Tasks 3 through 6. The factory imports concrete adapter modules, so those modules must be present before the runtime test can pass.

Steps

  • Add failing tests to test_channel_runtime.py:
def test_channel_runtime_builds_platform_adapters_without_starting_networks(tmp_path) -> None:
    runtime = ChannelRuntime(
        service=FakeAgentService(),
        workspace=tmp_path,
        channels={},
    )

    cases = {
        "telegram-main": ChannelConfig(enabled=True, kind="telegram", mode="polling", account_id="bot-main"),
        "feishu-main": ChannelConfig(enabled=True, kind="feishu", mode="websocket", account_id="tenant-main"),
        "qq-main": ChannelConfig(enabled=True, kind="qqbot", mode="websocket", account_id="qq-main"),
        "weixin-main": ChannelConfig(enabled=True, kind="weixin", mode="polling", account_id="wx-main"),
    }

    for channel_id, cfg in cases.items():
        adapter = runtime._build_adapter(channel_id, cfg)
        assert adapter.channel_id == channel_id
        assert adapter.kind == cfg.kind
        assert adapter.mode == cfg.mode


def test_channel_runtime_reports_platform_capabilities(tmp_path) -> None:
    runtime = ChannelRuntime(
        service=FakeAgentService(),
        workspace=tmp_path,
        channels={
            "telegram-main": ChannelConfig(enabled=True, kind="telegram", mode="polling", account_id="bot-main"),
            "weixin-main": ChannelConfig(enabled=True, kind="weixin", mode="polling", account_id="wx-main"),
        },
    )

    by_id = {item["channel_id"]: item for item in runtime.statuses()}

    assert by_id["telegram-main"]["capabilities"] == [
        "receive_text",
        "send_text",
        "receive_media",
        "groups",
    ]
    assert by_id["weixin-main"]["capabilities"] == [
        "receive_text",
        "send_text",
        "receive_media",
        "direct_messages",
    ]
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q

Expected first result:

ValueError: Unsupported channel kind/mode
  • Add a compact capability helper in runtime.py:
def _channel_capabilities(kind: str, mode: str) -> list[str]:
    if kind == "webhook":
        return ["receive_text", "send_text", "sync_webhook_response"]
    if kind == "terminal" and mode == "websocket":
        return ["receive_text", "send_text", "persistent_connection"]
    if kind in {"feishu", "qqbot", "telegram"}:
        return ["receive_text", "send_text", "receive_media", "groups"]
    if kind == "weixin":
        return ["receive_text", "send_text", "receive_media", "direct_messages"]
    return []
  • Use _channel_capabilities(cfg.kind, cfg.mode) inside statuses() while preserving the existing webhook and terminal URL fields.

  • Extend _build_adapter():

        if cfg.kind == "telegram" and cfg.mode in {"polling", "webhook"}:
            from beaver.interfaces.channels.platforms.telegram import TelegramAdapter

            return TelegramAdapter(
                channel_id=channel_id,
                kind=cfg.kind,
                mode=cfg.mode,
                account_id=cfg.account_id,
                display_name=cfg.display_name,
                inbound_sink=self,
                secrets=cfg.secrets,
                config=cfg.config,
                event_recorder=self.record_event,
            )

        if cfg.kind == "feishu" and cfg.mode in {"websocket", "webhook"}:
            from beaver.interfaces.channels.platforms.feishu import FeishuAdapter

            return FeishuAdapter(
                channel_id=channel_id,
                kind=cfg.kind,
                mode=cfg.mode,
                account_id=cfg.account_id,
                display_name=cfg.display_name,
                inbound_sink=self,
                secrets=cfg.secrets,
                config=cfg.config,
                event_recorder=self.record_event,
            )

        if cfg.kind == "qqbot" and cfg.mode == "websocket":
            from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter

            return QQBotAdapter(
                channel_id=channel_id,
                kind=cfg.kind,
                mode=cfg.mode,
                account_id=cfg.account_id,
                display_name=cfg.display_name,
                inbound_sink=self,
                secrets=cfg.secrets,
                config=cfg.config,
                event_recorder=self.record_event,
            )

        if cfg.kind == "weixin" and cfg.mode == "polling":
            from beaver.interfaces.channels.platforms.weixin import WeixinAdapter

            return WeixinAdapter(
                channel_id=channel_id,
                kind=cfg.kind,
                mode=cfg.mode,
                account_id=cfg.account_id,
                display_name=cfg.display_name,
                inbound_sink=self,
                secrets=cfg.secrets,
                config=cfg.config,
                event_recorder=self.record_event,
            )
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q

Expected result includes:

passed

Task 3: TelegramAdapter

Files:

  • Create app-instance/backend/beaver/interfaces/channels/platforms/telegram.py
  • Create app-instance/backend/tests/unit/test_telegram_channel_adapter.py

Steps

  • Write tests using fake sink and fake client:
import asyncio

from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.telegram import TelegramAdapter


class FakeSink:
    def __init__(self) -> None:
        self.messages = []

    async def accept_inbound(self, message):
        self.messages.append(message)


class FakeTelegramClient:
    def __init__(self) -> None:
        self.sent = []

    async def send_message(self, **kwargs):
        self.sent.append(kwargs)


def test_telegram_normalizes_private_text_message() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = TelegramAdapter(
            channel_id="telegram-main",
            kind="telegram",
            mode="polling",
            account_id="bot-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"botToken": "x"},
            config={},
            client=FakeTelegramClient(),
        )

        await adapter.handle_update_payload(
            {
                "message": {
                    "message_id": 100,
                    "text": "hello",
                    "chat": {"id": 200, "type": "private"},
                    "from": {"id": 300, "username": "ivan"},
                }
            }
        )

        message = sink.messages[0]
        assert message.channel == "telegram-main"
        assert message.content == "hello"
        assert message.session_id == "telegram-main:bot-main:200"
        assert message.channel_identity.peer_type == "dm"
        assert message.channel_identity.user_id == "300"
        assert message.channel_identity.message_id == "100"

    asyncio.run(run())


def test_telegram_group_requires_mention_when_configured() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = TelegramAdapter(
            channel_id="telegram-main",
            kind="telegram",
            mode="polling",
            account_id="bot-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"botToken": "x"},
            config={"requireMentionInGroups": True, "botUsername": "beaver_bot"},
            client=FakeTelegramClient(),
        )

        await adapter.handle_update_payload(
            {
                "message": {
                    "message_id": 101,
                    "text": "hello group",
                    "chat": {"id": -20, "type": "group"},
                    "from": {"id": 300},
                }
            }
        )
        await adapter.handle_update_payload(
            {
                "message": {
                    "message_id": 102,
                    "text": "@beaver_bot hello",
                    "chat": {"id": -20, "type": "group"},
                    "from": {"id": 300},
                }
            }
        )

        assert len(sink.messages) == 1
        assert sink.messages[0].content == "hello"

    asyncio.run(run())


def test_telegram_sends_chunked_reply_to_identity_target() -> None:
    async def run() -> None:
        sink = FakeSink()
        client = FakeTelegramClient()
        adapter = TelegramAdapter(
            channel_id="telegram-main",
            kind="telegram",
            mode="polling",
            account_id="bot-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"botToken": "x"},
            config={"maxMessageChars": 3},
            client=client,
        )
        await adapter.handle_update_payload(
            {
                "message": {
                    "message_id": 100,
                    "text": "hello",
                    "chat": {"id": 200, "type": "private"},
                    "from": {"id": 300},
                }
            }
        )

        await adapter.send(
            OutboundMessage(
                channel="telegram-main",
                content="abcdef",
                session_id=sink.messages[0].session_id,
                finish_reason="stop",
                channel_identity=sink.messages[0].channel_identity,
            )
        )

        assert [item["text"] for item in client.sent] == ["abc", "def"]
        assert client.sent[0]["chat_id"] == "200"

    asyncio.run(run())
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_adapter.py -q

Expected first result:

ModuleNotFoundError
  • Implement TelegramAdapter with these methods:
class TelegramAdapter:
    channel_id: str
    kind: str
    mode: str

    def __init__(..., client: Any | None = None, application_factory: Any | None = None) -> None:
        ...

    async def start(self) -> None:
        if self._client is not None:
            return
        if self.mode == "polling":
            self._application = self._build_application()
            await self._application.initialize()
            await self._application.start()
            await self._application.updater.start_polling()
            self._client = self._application.bot
            return
        if self.mode == "webhook":
            self._client = self._build_bot()
            return
        raise ValueError(f"Unsupported telegram mode: {self.mode}")

    async def stop(self) -> None:
        if self._application is not None:
            await self._application.updater.stop()
            await self._application.stop()
            await self._application.shutdown()

    async def handle_update_payload(self, payload: dict[str, Any]) -> None:
        message = self._normalize_payload(payload)
        if message is not None:
            await self.inbound_sink.accept_inbound(message)

    async def send(self, message: OutboundMessage) -> None:
        target = outbound_target(message)
        if not target.peer_id:
            mark_unclaimed(message)
            return
        client = self._require_client()
        for chunk in chunk_text(message.content, max_chars=self.max_message_chars):
            await client.send_message(chat_id=target.peer_id, text=chunk)
  • Normalization requirements:

    • Use message.text or message.caption.
    • Map private chats to peer_type="dm".
    • Map group, supergroup, and channel chats to peer_type="group" or peer_type="channel".
    • Use message.message_thread_id as thread_id when present.
    • For photo/document/audio/video, append compact_media_summary() to content and add compact media metadata.
    • If requireMentionInGroups is true, accept only text containing @<botUsername>, then strip that mention from content.
  • Run:

cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_adapter.py -q

Expected result:

3 passed

Task 4: FeishuAdapter

Files:

  • Create app-instance/backend/beaver/interfaces/channels/platforms/feishu.py
  • Create app-instance/backend/tests/unit/test_feishu_channel_adapter.py

Steps

  • Write tests:
import asyncio

from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.feishu import FeishuAdapter


class FakeSink:
    def __init__(self) -> None:
        self.messages = []

    async def accept_inbound(self, message):
        self.messages.append(message)


class FakeFeishuClient:
    def __init__(self) -> None:
        self.sent = []

    async def send_text(self, *, receive_id_type: str, receive_id: str, text: str):
        self.sent.append(
            {"receive_id_type": receive_id_type, "receive_id": receive_id, "text": text}
        )


def test_feishu_normalizes_direct_text_event() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = FeishuAdapter(
            channel_id="feishu-main",
            kind="feishu",
            mode="websocket",
            account_id="tenant-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"appId": "app", "appSecret": "secret"},
            config={},
            client=FakeFeishuClient(),
        )

        await adapter.handle_event_payload(
            {
                "event": {
                    "message": {
                        "message_id": "m1",
                        "chat_id": "oc_chat",
                        "chat_type": "p2p",
                        "message_type": "text",
                        "content": "{\"text\":\"hello\"}",
                    },
                    "sender": {"sender_id": {"open_id": "ou_user"}},
                }
            }
        )

        message = sink.messages[0]
        assert message.content == "hello"
        assert message.session_id == "feishu-main:tenant-main:oc_chat"
        assert message.channel_identity.peer_type == "dm"
        assert message.channel_identity.user_id == "ou_user"

    asyncio.run(run())


def test_feishu_group_mention_gate() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = FeishuAdapter(
            channel_id="feishu-main",
            kind="feishu",
            mode="websocket",
            account_id="tenant-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"appId": "app", "appSecret": "secret"},
            config={"requireMentionInGroups": True, "botOpenId": "ou_bot"},
            client=FakeFeishuClient(),
        )

        await adapter.handle_event_payload(
            {
                "event": {
                    "message": {
                        "message_id": "m1",
                        "chat_id": "oc_group",
                        "chat_type": "group",
                        "message_type": "text",
                        "content": "{\"text\":\"hello\"}",
                        "mentions": [],
                    },
                    "sender": {"sender_id": {"open_id": "ou_user"}},
                }
            }
        )
        await adapter.handle_event_payload(
            {
                "event": {
                    "message": {
                        "message_id": "m2",
                        "chat_id": "oc_group",
                        "chat_type": "group",
                        "message_type": "text",
                        "content": "{\"text\":\"hello\"}",
                        "mentions": [{"id": {"open_id": "ou_bot"}}],
                    },
                    "sender": {"sender_id": {"open_id": "ou_user"}},
                }
            }
        )

        assert len(sink.messages) == 1

    asyncio.run(run())


def test_feishu_sends_text_to_chat_id() -> None:
    async def run() -> None:
        sink = FakeSink()
        client = FakeFeishuClient()
        adapter = FeishuAdapter(
            channel_id="feishu-main",
            kind="feishu",
            mode="websocket",
            account_id="tenant-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"appId": "app", "appSecret": "secret"},
            config={},
            client=client,
        )
        await adapter.handle_event_payload(
            {
                "event": {
                    "message": {
                        "message_id": "m1",
                        "chat_id": "oc_chat",
                        "chat_type": "p2p",
                        "message_type": "text",
                        "content": "{\"text\":\"hello\"}",
                    },
                    "sender": {"sender_id": {"open_id": "ou_user"}},
                }
            }
        )
        await adapter.send(
            OutboundMessage(
                channel="feishu-main",
                content="ok",
                session_id=sink.messages[0].session_id,
                finish_reason="stop",
                channel_identity=sink.messages[0].channel_identity,
            )
        )

        assert client.sent == [
            {"receive_id_type": "chat_id", "receive_id": "oc_chat", "text": "ok"}
        ]

    asyncio.run(run())
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_feishu_channel_adapter.py -q

Expected first result:

ModuleNotFoundError
  • Implement FeishuAdapter:

    • Parse event.message.content as JSON when possible.
    • message_type="text" reads content["text"].
    • Non-text message types use compact_media_summary(message_type, file_name=...).
    • chat_type="p2p" maps to peer_type="dm".
    • Any other chat type maps to peer_type="group".
    • sender.sender_id.open_id becomes user_id.
    • Mention gate checks message.mentions[*].id.open_id against config.botOpenId.
    • send() calls client.send_text(receive_id_type="chat_id", receive_id=target.peer_id, text=chunk).
    • Live client creation is isolated in _build_client() and imports lark_oapi only inside that method.
  • Run:

cd app-instance/backend
uv run pytest tests/unit/test_feishu_channel_adapter.py -q

Expected result:

3 passed

Task 5: QQBotAdapter

Files:

  • Create app-instance/backend/beaver/interfaces/channels/platforms/qqbot.py
  • Create app-instance/backend/tests/unit/test_qqbot_channel_adapter.py

Steps

  • Write tests:
import asyncio

from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter


class FakeSink:
    def __init__(self) -> None:
        self.messages = []

    async def accept_inbound(self, message):
        self.messages.append(message)


class FakeQQBotClient:
    def __init__(self) -> None:
        self.sent = []

    async def send_text(self, *, peer_type: str, peer_id: str, content: str, message_id: str | None):
        self.sent.append(
            {
                "peer_type": peer_type,
                "peer_id": peer_id,
                "content": content,
                "message_id": message_id,
            }
        )


def test_qqbot_normalizes_private_c2c_message() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = QQBotAdapter(
            channel_id="qq-main",
            kind="qqbot",
            mode="websocket",
            account_id="qq-bot",
            display_name=None,
            inbound_sink=sink,
            secrets={"appId": "app", "clientSecret": "secret"},
            config={},
            client=FakeQQBotClient(),
        )

        await adapter.handle_event_payload(
            {
                "t": "C2C_MESSAGE_CREATE",
                "d": {
                    "id": "m1",
                    "author": {"user_openid": "u1"},
                    "content": "hello",
                },
            }
        )

        message = sink.messages[0]
        assert message.content == "hello"
        assert message.session_id == "qq-main:qq-bot:u1"
        assert message.channel_identity.peer_type == "dm"
        assert message.channel_identity.user_id == "u1"

    asyncio.run(run())


def test_qqbot_normalizes_group_message() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = QQBotAdapter(
            channel_id="qq-main",
            kind="qqbot",
            mode="websocket",
            account_id="qq-bot",
            display_name=None,
            inbound_sink=sink,
            secrets={"appId": "app", "clientSecret": "secret"},
            config={},
            client=FakeQQBotClient(),
        )

        await adapter.handle_event_payload(
            {
                "t": "GROUP_AT_MESSAGE_CREATE",
                "d": {
                    "id": "m2",
                    "group_openid": "g1",
                    "author": {"member_openid": "u1"},
                    "content": "hello group",
                },
            }
        )

        message = sink.messages[0]
        assert message.session_id == "qq-main:qq-bot:g1"
        assert message.channel_identity.peer_type == "group"
        assert message.channel_identity.user_id == "u1"

    asyncio.run(run())


def test_qqbot_sends_reply_with_original_message_id() -> None:
    async def run() -> None:
        sink = FakeSink()
        client = FakeQQBotClient()
        adapter = QQBotAdapter(
            channel_id="qq-main",
            kind="qqbot",
            mode="websocket",
            account_id="qq-bot",
            display_name=None,
            inbound_sink=sink,
            secrets={"appId": "app", "clientSecret": "secret"},
            config={},
            client=client,
        )
        await adapter.handle_event_payload(
            {
                "t": "GROUP_AT_MESSAGE_CREATE",
                "d": {
                    "id": "m2",
                    "group_openid": "g1",
                    "author": {"member_openid": "u1"},
                    "content": "hello group",
                },
            }
        )
        await adapter.send(
            OutboundMessage(
                channel="qq-main",
                content="ok",
                session_id=sink.messages[0].session_id,
                finish_reason="stop",
                channel_identity=sink.messages[0].channel_identity,
            )
        )

        assert client.sent[0] == {
            "peer_type": "group",
            "peer_id": "g1",
            "content": "ok",
            "message_id": "m2",
        }

    asyncio.run(run())
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_qqbot_channel_adapter.py -q

Expected first result:

ModuleNotFoundError
  • Implement QQBotAdapter:

    • C2C_MESSAGE_CREATE: peer_id=author.user_openid, peer_type="dm".
    • GROUP_AT_MESSAGE_CREATE: peer_id=group_openid, peer_type="group", user_id=author.member_openid.
    • Guild/channel events with guild_id and channel_id: peer_id=channel_id, thread_id=guild_id, peer_type="channel".
    • Attachment arrays become compact media summaries and metadata entries.
    • Apply dmPolicy, groupPolicy, allowFrom, and groupAllowFrom before accepting.
    • send() calls the injected client send_text().
    • Live websocket and REST client setup live behind methods that import optional packages only on start.
  • Run:

cd app-instance/backend
uv run pytest tests/unit/test_qqbot_channel_adapter.py -q

Expected result:

3 passed

Task 6: WeixinAdapter

Files:

  • Create app-instance/backend/beaver/interfaces/channels/platforms/weixin.py
  • Create app-instance/backend/tests/unit/test_weixin_channel_adapter.py

Steps

  • Write tests:
import asyncio

from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.weixin import WeixinAdapter


class FakeSink:
    def __init__(self) -> None:
        self.messages = []

    async def accept_inbound(self, message):
        self.messages.append(message)


class FakeWeixinClient:
    def __init__(self) -> None:
        self.sent = []

    async def send_text(self, *, peer_id: str, text: str, context_token: str | None):
        self.sent.append({"peer_id": peer_id, "text": text, "context_token": context_token})


def test_weixin_normalizes_direct_text_message() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = WeixinAdapter(
            channel_id="weixin-main",
            kind="weixin",
            mode="polling",
            account_id="wx-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"token": "token"},
            config={},
            client=FakeWeixinClient(),
        )

        await adapter.handle_message_payload(
            {
                "id": "m1",
                "from": "wx_user",
                "room_id": "",
                "type": "text",
                "text": "hello",
                "context_token": "ctx1",
            }
        )

        message = sink.messages[0]
        assert message.content == "hello"
        assert message.session_id == "weixin-main:wx-main:wx_user"
        assert message.channel_identity.peer_type == "dm"
        assert message.metadata["context_token"] == "ctx1"

    asyncio.run(run())


def test_weixin_group_message_is_best_effort() -> None:
    async def run() -> None:
        sink = FakeSink()
        adapter = WeixinAdapter(
            channel_id="weixin-main",
            kind="weixin",
            mode="polling",
            account_id="wx-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"token": "token"},
            config={"groupPolicy": "open"},
            client=FakeWeixinClient(),
        )

        await adapter.handle_message_payload(
            {
                "id": "m2",
                "from": "wx_user",
                "room_id": "room1",
                "type": "text",
                "text": "hello room",
                "context_token": "ctx2",
            }
        )

        message = sink.messages[0]
        assert message.session_id == "weixin-main:wx-main:room1"
        assert message.channel_identity.peer_type == "group"
        assert message.channel_identity.user_id == "wx_user"

    asyncio.run(run())


def test_weixin_sends_text_with_context_token() -> None:
    async def run() -> None:
        sink = FakeSink()
        client = FakeWeixinClient()
        adapter = WeixinAdapter(
            channel_id="weixin-main",
            kind="weixin",
            mode="polling",
            account_id="wx-main",
            display_name=None,
            inbound_sink=sink,
            secrets={"token": "token"},
            config={},
            client=client,
        )
        await adapter.handle_message_payload(
            {
                "id": "m1",
                "from": "wx_user",
                "type": "text",
                "text": "hello",
                "context_token": "ctx1",
            }
        )
        await adapter.send(
            OutboundMessage(
                channel="weixin-main",
                content="ok",
                session_id=sink.messages[0].session_id,
                finish_reason="stop",
                channel_identity=sink.messages[0].channel_identity,
                metadata={"inbound_metadata": sink.messages[0].metadata},
            )
        )

        assert client.sent == [{"peer_id": "wx_user", "text": "ok", "context_token": "ctx1"}]

    asyncio.run(run())
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_weixin_channel_adapter.py -q

Expected first result:

ModuleNotFoundError
  • Implement WeixinAdapter:

    • Direct messages use peer_id=from, peer_type="dm".
    • Group messages use peer_id=room_id, peer_type="group", user_id=from.
    • Default dmPolicy is open.
    • Default groupPolicy is disabled.
    • Preserve context_token in inbound metadata.
    • send() reads context_token from message.metadata["inbound_metadata"] when available.
    • Live polling setup uses configured baseUrl, cdnBaseUrl, and token.
    • QR credential loading is represented as explicit config/state read inside the live client builder.
  • Run:

cd app-instance/backend
uv run pytest tests/unit/test_weixin_channel_adapter.py -q

Expected result:

3 passed

Task 7: Import Guards and Optional Extras

Files:

  • Modify app-instance/backend/pyproject.toml
  • Modify app-instance/backend/tests/unit/test_imports.py

Steps

  • Update pyproject.toml optional extras exactly as described in Dependency Strategy.

  • Add import tests:

def test_platform_channel_modules_import_without_live_clients() -> None:
    from beaver.interfaces.channels.platforms.feishu import FeishuAdapter
    from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter
    from beaver.interfaces.channels.platforms.telegram import TelegramAdapter
    from beaver.interfaces.channels.platforms.weixin import WeixinAdapter

    assert FeishuAdapter.KIND == "feishu"
    assert QQBotAdapter.KIND == "qqbot"
    assert TelegramAdapter.KIND == "telegram"
    assert WeixinAdapter.KIND == "weixin"
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_imports.py -q

Expected result:

passed

Task 8: Runtime Startup Isolation

Files:

  • Modify app-instance/backend/tests/unit/test_channel_runtime.py
  • Modify adapter files as needed

Steps

  • Add a startup isolation test:
def test_channel_runtime_platform_start_failure_does_not_stop_other_channels(tmp_path) -> None:
    async def run() -> None:
        runtime = ChannelRuntime(
            service=FakeAgentService(),
            workspace=tmp_path,
            channels={
                "telegram-main": ChannelConfig(
                    enabled=True,
                    kind="telegram",
                    mode="polling",
                    account_id="bot-main",
                    secrets={},
                ),
                "off": ChannelConfig(
                    enabled=False,
                    kind="weixin",
                    mode="polling",
                    account_id="wx-main",
                ),
            },
        )

        await runtime.start()
        try:
            by_id = {item["channel_id"]: item for item in runtime.statuses()}
        finally:
            await runtime.stop()

        assert by_id["telegram-main"]["state"] == "error"
        assert "botToken" in by_id["telegram-main"]["last_error"]
        assert by_id["off"]["state"] == "disabled"

    asyncio.run(run())
  • Ensure each live adapter builder validates required secrets only when start() needs a live client:
def _require_secret(self, key: str) -> str:
    value = self.secrets.get(key)
    if not value:
        raise ValueError(f"{key} is required")
    return str(value)
  • Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q

Expected result:

passed

Task 9: Cross-Channel Verification

Files: no new files unless a previous task reveals a test gap.

Steps

  • Run all focused channel tests:
cd app-instance/backend
uv run pytest \
  tests/unit/test_platform_channel_helpers.py \
  tests/unit/test_telegram_channel_adapter.py \
  tests/unit/test_feishu_channel_adapter.py \
  tests/unit/test_qqbot_channel_adapter.py \
  tests/unit/test_weixin_channel_adapter.py \
  tests/unit/test_channel_runtime.py \
  tests/unit/test_imports.py \
  -q

Expected result:

passed
  • Run the broader backend unit suite:
cd app-instance/backend
uv run pytest tests/unit -q

Expected result:

passed
  • Inspect changed files:
git status --short
git diff -- app-instance/backend/beaver/interfaces/channels app-instance/backend/tests/unit app-instance/backend/pyproject.toml
  • Confirm these invariants manually from the diff:

    • ChannelRuntime still owns MessageBus and agent dispatch.
    • Platform adapters call only ChannelInboundSink.accept_inbound() for inbound routing.
    • Platform adapters do not call AgentService.
    • Optional platform packages are imported only inside live client or live transport builders.
    • Tests use fake clients and fake payloads.
    • No adapter sends when the outbound target cannot be resolved; it marks delivery_status="unclaimed" instead.

Implementation Order

  1. Task 1: Shared Platform Helpers
  2. Task 3: TelegramAdapter
  3. Task 4: FeishuAdapter
  4. Task 5: QQBotAdapter
  5. Task 6: WeixinAdapter
  6. Task 2: Runtime Factory and Status
  7. Task 7: Import Guards and Optional Extras
  8. Task 8: Runtime Startup Isolation
  9. Task 9: Cross-Channel Verification

This order proves the shared adapter pattern with Telegram first, then adds the two official bot-style channels, leaves Weixin adapter behavior last because context tokens and login state make it the most specialized, and only then connects the concrete modules to the runtime factory.