44 KiB
Chat Platform Channel Adapters Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use
superpowers:subagent-driven-developmentfor implementation across the independent adapter tasks, orsuperpowers:executing-plansfor inline execution. Track progress with the checkbox steps below.
Goal: Add first-class Beaver adapters for telegram, feishu, qqbot, and weixin, each plugged into ChannelRuntime, normalizing inbound platform events into InboundMessage with ChannelIdentity, and delivering final OutboundMessage replies back to the original platform conversation.
Architecture: Platform adapters are hosted in the backend process. They own transport and platform delivery details. ChannelRuntime remains the owner of lifecycle, dedupe, event logging, bus routing, and agent dispatch.
platform SDK/API
-> {Channel}Adapter
-> ChannelRuntime.accept_inbound()
-> MessageBus.inbound
-> ChannelRuntime bridge
-> AgentService.handle_inbound_message()
-> MessageBus.outbound
-> ChannelManager.dispatch_outbound()
-> {Channel}Adapter.send()
-> platform SDK/API
Locked Decisions
- Implement these adapters only:
FeishuAdapter,QQBotAdapter,TelegramAdapter,WeixinAdapter. - Do not add WhatsApp in this phase.
- Do not add a sidecar process as the default channel path.
- Keep adapter class names platform-only:
FeishuAdapter,QQBotAdapter,TelegramAdapter,WeixinAdapter. - Keep platform imports lazy, so backend imports and unrelated channels work without every optional package installed.
- Adapter constructors must accept injectable clients or factories for unit tests.
- Unit tests must not call real platform networks.
- Runtime adapter selection is by existing
ChannelConfig.kindandChannelConfig.mode. - Inbound messages must include
ChannelIdentity. - Outbound send targets must prefer
message.channel_identity, then fall back to the normalizedmessage.session_id. - Unsupported or incomplete media appears in content and compact metadata instead of being silently dropped.
- Streaming token deltas, pairing flows, hot reload, and platform config UI are not included.
- Weixin group delivery is best-effort only and is not listed as a stable group capability.
File Structure
- Create:
app-instance/backend/beaver/interfaces/channels/platforms/__init__.py - Create:
app-instance/backend/beaver/interfaces/channels/platforms/base.py - Create:
app-instance/backend/beaver/interfaces/channels/platforms/telegram.py - Create:
app-instance/backend/beaver/interfaces/channels/platforms/feishu.py - Create:
app-instance/backend/beaver/interfaces/channels/platforms/qqbot.py - Create:
app-instance/backend/beaver/interfaces/channels/platforms/weixin.py - Modify:
app-instance/backend/beaver/interfaces/channels/runtime.py - Modify:
app-instance/backend/pyproject.toml - Create:
app-instance/backend/tests/unit/test_platform_channel_helpers.py - Create:
app-instance/backend/tests/unit/test_telegram_channel_adapter.py - Create:
app-instance/backend/tests/unit/test_feishu_channel_adapter.py - Create:
app-instance/backend/tests/unit/test_qqbot_channel_adapter.py - Create:
app-instance/backend/tests/unit/test_weixin_channel_adapter.py - Modify:
app-instance/backend/tests/unit/test_channel_runtime.py - Modify:
app-instance/backend/tests/unit/test_imports.py
Dependency Strategy
Add optional extras for live adapters. Do not put these packages in the base backend dependency list.
[project.optional-dependencies]
dev = [
"pytest>=9.0.0,<10.0.0",
]
telegram = [
"python-telegram-bot>=22.0,<23.0",
]
feishu = [
"lark-oapi>=1.4.22,<2.0.0",
]
qqbot = [
"aiohttp>=3.9.0,<4.0.0",
]
weixin = [
"aiohttp>=3.9.0,<4.0.0",
]
channels = [
"python-telegram-bot>=22.0,<23.0",
"lark-oapi>=1.4.22,<2.0.0",
"aiohttp>=3.9.0,<4.0.0",
]
The plan keeps httpx as the default HTTP client where it is already sufficient. Use aiohttp only for platform SDK paths that require it.
Shared Data Rules
All normalized identities use:
<channel_id>:<account_id>:<peer_id>[:<thread_id>]
Outbound target resolution:
def outbound_target(message: OutboundMessage) -> OutboundTarget:
if message.channel_identity is not None:
identity = message.channel_identity
return OutboundTarget(
peer_id=identity.peer_id,
thread_id=identity.thread_id,
peer_type=identity.peer_type,
user_id=identity.user_id,
)
return target_from_session_id(message.session_id)
If neither identity nor session id can produce a peer id, adapter send() sets message.metadata["delivery_status"] = "unclaimed" and returns without calling the platform client.
Task 1: Shared Platform Helpers
Files:
- Create
app-instance/backend/beaver/interfaces/channels/platforms/__init__.py - Create
app-instance/backend/beaver/interfaces/channels/platforms/base.py - Create
app-instance/backend/tests/unit/test_platform_channel_helpers.py
Steps
- Write failing tests in
test_platform_channel_helpers.py:
from beaver.foundation.events import ChannelIdentity, OutboundMessage
from beaver.interfaces.channels.platforms.base import (
chunk_text,
compact_media_summary,
config_bool,
config_list,
outbound_target,
)
def test_config_helpers_normalize_common_values() -> None:
assert config_bool({"enabled": "true"}, "enabled", default=False) is True
assert config_bool({"enabled": "0"}, "enabled", default=True) is False
assert config_list({"allowFrom": "u1,u2"}, "allowFrom") == ["u1", "u2"]
assert config_list({"allowFrom": ["u1", 2]}, "allowFrom") == ["u1", "2"]
def test_chunk_text_preserves_order_and_limit() -> None:
chunks = chunk_text("abcdef", max_chars=2)
assert chunks == ["ab", "cd", "ef"]
def test_outbound_target_prefers_channel_identity() -> None:
identity = ChannelIdentity(
channel_id="telegram-main",
kind="telegram",
account_id="bot-main",
peer_id="chat-1",
thread_id="topic-1",
peer_type="group",
user_id="user-1",
)
message = OutboundMessage(
channel="telegram-main",
content="ok",
session_id="ignored",
finish_reason="stop",
channel_identity=identity,
)
target = outbound_target(message)
assert target.peer_id == "chat-1"
assert target.thread_id == "topic-1"
assert target.peer_type == "group"
assert target.user_id == "user-1"
def test_outbound_target_falls_back_to_session_id() -> None:
message = OutboundMessage(
channel="telegram-main",
content="ok",
session_id="telegram-main:bot-main:chat-1:topic-1",
finish_reason="stop",
)
target = outbound_target(message)
assert target.peer_id == "chat-1"
assert target.thread_id == "topic-1"
def test_compact_media_summary_mentions_attachment_type() -> None:
assert compact_media_summary("photo", file_name="cat.png") == "[photo: cat.png]"
assert compact_media_summary("document") == "[document]"
- Run the focused test and confirm it fails because the module does not exist:
cd app-instance/backend
uv run pytest tests/unit/test_platform_channel_helpers.py -q
Expected first result:
ModuleNotFoundError: No module named 'beaver.interfaces.channels.platforms'
- Implement
platforms/__init__.py:
"""Platform channel adapters."""
- Implement
platforms/base.py:
"""Shared helpers for platform channel adapters."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage
@dataclass(slots=True)
class OutboundTarget:
peer_id: str | None
thread_id: str | None = None
peer_type: str = "unknown"
user_id: str | None = None
class PlatformDeliveryError(RuntimeError):
"""Raised when a platform client rejects a delivery."""
def config_bool(config: dict[str, Any], key: str, *, default: bool = False) -> bool:
value = config.get(key)
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
text = str(value).strip().lower()
if text in {"1", "true", "yes", "on"}:
return True
if text in {"0", "false", "no", "off"}:
return False
return default
def config_list(config: dict[str, Any], key: str) -> list[str]:
value = config.get(key)
if value is None:
return []
if isinstance(value, str):
return [part.strip() for part in value.split(",") if part.strip()]
if isinstance(value, (list, tuple, set)):
return [str(item).strip() for item in value if str(item).strip()]
return [str(value).strip()] if str(value).strip() else []
def chunk_text(text: str, *, max_chars: int) -> list[str]:
if max_chars <= 0:
raise ValueError("max_chars must be positive")
if not text:
return [""]
return [text[index : index + max_chars] for index in range(0, len(text), max_chars)]
def compact_media_summary(media_type: str, *, file_name: str | None = None) -> str:
label = str(media_type or "attachment").strip() or "attachment"
if file_name:
return f"[{label}: {file_name}]"
return f"[{label}]"
def target_from_session_id(session_id: str | None) -> OutboundTarget:
if not session_id:
return OutboundTarget(peer_id=None)
parts = str(session_id).split(":")
if len(parts) < 3:
return OutboundTarget(peer_id=None)
thread_id = parts[3] if len(parts) > 3 and parts[3] else None
return OutboundTarget(peer_id=parts[2] or None, thread_id=thread_id)
def outbound_target(message: OutboundMessage) -> OutboundTarget:
identity = message.channel_identity
if identity is None:
return target_from_session_id(message.session_id)
return OutboundTarget(
peer_id=identity.peer_id,
thread_id=identity.thread_id,
peer_type=identity.peer_type,
user_id=identity.user_id,
)
def mark_unclaimed(message: OutboundMessage) -> None:
message.metadata["delivery_status"] = "unclaimed"
def build_inbound_message(
*,
channel_id: str,
kind: str,
account_id: str,
peer_id: str,
content: str,
message_id: str | None,
peer_type: str,
user_id: str | None = None,
thread_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> InboundMessage:
identity = ChannelIdentity(
channel_id=channel_id,
kind=kind,
account_id=account_id,
peer_id=peer_id,
thread_id=thread_id,
peer_type=peer_type,
user_id=user_id,
message_id=message_id,
)
return InboundMessage(
channel=channel_id,
content=content,
session_id=identity.session_id(),
user_id=user_id,
message_id=message_id or "",
channel_identity=identity,
metadata=metadata or {},
)
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_platform_channel_helpers.py -q
Expected result:
5 passed
Task 2: Runtime Factory and Status
Files:
- Modify
app-instance/backend/beaver/interfaces/channels/runtime.py - Modify
app-instance/backend/tests/unit/test_channel_runtime.py
Execute this task after the four adapter files exist from Tasks 3 through 6. The factory imports concrete adapter modules, so those modules must be present before the runtime test can pass.
Steps
- Add failing tests to
test_channel_runtime.py:
def test_channel_runtime_builds_platform_adapters_without_starting_networks(tmp_path) -> None:
runtime = ChannelRuntime(
service=FakeAgentService(),
workspace=tmp_path,
channels={},
)
cases = {
"telegram-main": ChannelConfig(enabled=True, kind="telegram", mode="polling", account_id="bot-main"),
"feishu-main": ChannelConfig(enabled=True, kind="feishu", mode="websocket", account_id="tenant-main"),
"qq-main": ChannelConfig(enabled=True, kind="qqbot", mode="websocket", account_id="qq-main"),
"weixin-main": ChannelConfig(enabled=True, kind="weixin", mode="polling", account_id="wx-main"),
}
for channel_id, cfg in cases.items():
adapter = runtime._build_adapter(channel_id, cfg)
assert adapter.channel_id == channel_id
assert adapter.kind == cfg.kind
assert adapter.mode == cfg.mode
def test_channel_runtime_reports_platform_capabilities(tmp_path) -> None:
runtime = ChannelRuntime(
service=FakeAgentService(),
workspace=tmp_path,
channels={
"telegram-main": ChannelConfig(enabled=True, kind="telegram", mode="polling", account_id="bot-main"),
"weixin-main": ChannelConfig(enabled=True, kind="weixin", mode="polling", account_id="wx-main"),
},
)
by_id = {item["channel_id"]: item for item in runtime.statuses()}
assert by_id["telegram-main"]["capabilities"] == [
"receive_text",
"send_text",
"receive_media",
"groups",
]
assert by_id["weixin-main"]["capabilities"] == [
"receive_text",
"send_text",
"receive_media",
"direct_messages",
]
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q
Expected first result:
ValueError: Unsupported channel kind/mode
- Add a compact capability helper in
runtime.py:
def _channel_capabilities(kind: str, mode: str) -> list[str]:
if kind == "webhook":
return ["receive_text", "send_text", "sync_webhook_response"]
if kind == "terminal" and mode == "websocket":
return ["receive_text", "send_text", "persistent_connection"]
if kind in {"feishu", "qqbot", "telegram"}:
return ["receive_text", "send_text", "receive_media", "groups"]
if kind == "weixin":
return ["receive_text", "send_text", "receive_media", "direct_messages"]
return []
-
Use
_channel_capabilities(cfg.kind, cfg.mode)insidestatuses()while preserving the existing webhook and terminal URL fields. -
Extend
_build_adapter():
if cfg.kind == "telegram" and cfg.mode in {"polling", "webhook"}:
from beaver.interfaces.channels.platforms.telegram import TelegramAdapter
return TelegramAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "feishu" and cfg.mode in {"websocket", "webhook"}:
from beaver.interfaces.channels.platforms.feishu import FeishuAdapter
return FeishuAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "qqbot" and cfg.mode == "websocket":
from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter
return QQBotAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "weixin" and cfg.mode == "polling":
from beaver.interfaces.channels.platforms.weixin import WeixinAdapter
return WeixinAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q
Expected result includes:
passed
Task 3: TelegramAdapter
Files:
- Create
app-instance/backend/beaver/interfaces/channels/platforms/telegram.py - Create
app-instance/backend/tests/unit/test_telegram_channel_adapter.py
Steps
- Write tests using fake sink and fake client:
import asyncio
from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.telegram import TelegramAdapter
class FakeSink:
def __init__(self) -> None:
self.messages = []
async def accept_inbound(self, message):
self.messages.append(message)
class FakeTelegramClient:
def __init__(self) -> None:
self.sent = []
async def send_message(self, **kwargs):
self.sent.append(kwargs)
def test_telegram_normalizes_private_text_message() -> None:
async def run() -> None:
sink = FakeSink()
adapter = TelegramAdapter(
channel_id="telegram-main",
kind="telegram",
mode="polling",
account_id="bot-main",
display_name=None,
inbound_sink=sink,
secrets={"botToken": "x"},
config={},
client=FakeTelegramClient(),
)
await adapter.handle_update_payload(
{
"message": {
"message_id": 100,
"text": "hello",
"chat": {"id": 200, "type": "private"},
"from": {"id": 300, "username": "ivan"},
}
}
)
message = sink.messages[0]
assert message.channel == "telegram-main"
assert message.content == "hello"
assert message.session_id == "telegram-main:bot-main:200"
assert message.channel_identity.peer_type == "dm"
assert message.channel_identity.user_id == "300"
assert message.channel_identity.message_id == "100"
asyncio.run(run())
def test_telegram_group_requires_mention_when_configured() -> None:
async def run() -> None:
sink = FakeSink()
adapter = TelegramAdapter(
channel_id="telegram-main",
kind="telegram",
mode="polling",
account_id="bot-main",
display_name=None,
inbound_sink=sink,
secrets={"botToken": "x"},
config={"requireMentionInGroups": True, "botUsername": "beaver_bot"},
client=FakeTelegramClient(),
)
await adapter.handle_update_payload(
{
"message": {
"message_id": 101,
"text": "hello group",
"chat": {"id": -20, "type": "group"},
"from": {"id": 300},
}
}
)
await adapter.handle_update_payload(
{
"message": {
"message_id": 102,
"text": "@beaver_bot hello",
"chat": {"id": -20, "type": "group"},
"from": {"id": 300},
}
}
)
assert len(sink.messages) == 1
assert sink.messages[0].content == "hello"
asyncio.run(run())
def test_telegram_sends_chunked_reply_to_identity_target() -> None:
async def run() -> None:
sink = FakeSink()
client = FakeTelegramClient()
adapter = TelegramAdapter(
channel_id="telegram-main",
kind="telegram",
mode="polling",
account_id="bot-main",
display_name=None,
inbound_sink=sink,
secrets={"botToken": "x"},
config={"maxMessageChars": 3},
client=client,
)
await adapter.handle_update_payload(
{
"message": {
"message_id": 100,
"text": "hello",
"chat": {"id": 200, "type": "private"},
"from": {"id": 300},
}
}
)
await adapter.send(
OutboundMessage(
channel="telegram-main",
content="abcdef",
session_id=sink.messages[0].session_id,
finish_reason="stop",
channel_identity=sink.messages[0].channel_identity,
)
)
assert [item["text"] for item in client.sent] == ["abc", "def"]
assert client.sent[0]["chat_id"] == "200"
asyncio.run(run())
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_adapter.py -q
Expected first result:
ModuleNotFoundError
- Implement
TelegramAdapterwith these methods:
class TelegramAdapter:
channel_id: str
kind: str
mode: str
def __init__(..., client: Any | None = None, application_factory: Any | None = None) -> None:
...
async def start(self) -> None:
if self._client is not None:
return
if self.mode == "polling":
self._application = self._build_application()
await self._application.initialize()
await self._application.start()
await self._application.updater.start_polling()
self._client = self._application.bot
return
if self.mode == "webhook":
self._client = self._build_bot()
return
raise ValueError(f"Unsupported telegram mode: {self.mode}")
async def stop(self) -> None:
if self._application is not None:
await self._application.updater.stop()
await self._application.stop()
await self._application.shutdown()
async def handle_update_payload(self, payload: dict[str, Any]) -> None:
message = self._normalize_payload(payload)
if message is not None:
await self.inbound_sink.accept_inbound(message)
async def send(self, message: OutboundMessage) -> None:
target = outbound_target(message)
if not target.peer_id:
mark_unclaimed(message)
return
client = self._require_client()
for chunk in chunk_text(message.content, max_chars=self.max_message_chars):
await client.send_message(chat_id=target.peer_id, text=chunk)
-
Normalization requirements:
- Use
message.textormessage.caption. - Map private chats to
peer_type="dm". - Map group, supergroup, and channel chats to
peer_type="group"orpeer_type="channel". - Use
message.message_thread_idasthread_idwhen present. - For photo/document/audio/video, append
compact_media_summary()to content and add compact media metadata. - If
requireMentionInGroupsis true, accept only text containing@<botUsername>, then strip that mention from content.
- Use
-
Run:
cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_adapter.py -q
Expected result:
3 passed
Task 4: FeishuAdapter
Files:
- Create
app-instance/backend/beaver/interfaces/channels/platforms/feishu.py - Create
app-instance/backend/tests/unit/test_feishu_channel_adapter.py
Steps
- Write tests:
import asyncio
from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.feishu import FeishuAdapter
class FakeSink:
def __init__(self) -> None:
self.messages = []
async def accept_inbound(self, message):
self.messages.append(message)
class FakeFeishuClient:
def __init__(self) -> None:
self.sent = []
async def send_text(self, *, receive_id_type: str, receive_id: str, text: str):
self.sent.append(
{"receive_id_type": receive_id_type, "receive_id": receive_id, "text": text}
)
def test_feishu_normalizes_direct_text_event() -> None:
async def run() -> None:
sink = FakeSink()
adapter = FeishuAdapter(
channel_id="feishu-main",
kind="feishu",
mode="websocket",
account_id="tenant-main",
display_name=None,
inbound_sink=sink,
secrets={"appId": "app", "appSecret": "secret"},
config={},
client=FakeFeishuClient(),
)
await adapter.handle_event_payload(
{
"event": {
"message": {
"message_id": "m1",
"chat_id": "oc_chat",
"chat_type": "p2p",
"message_type": "text",
"content": "{\"text\":\"hello\"}",
},
"sender": {"sender_id": {"open_id": "ou_user"}},
}
}
)
message = sink.messages[0]
assert message.content == "hello"
assert message.session_id == "feishu-main:tenant-main:oc_chat"
assert message.channel_identity.peer_type == "dm"
assert message.channel_identity.user_id == "ou_user"
asyncio.run(run())
def test_feishu_group_mention_gate() -> None:
async def run() -> None:
sink = FakeSink()
adapter = FeishuAdapter(
channel_id="feishu-main",
kind="feishu",
mode="websocket",
account_id="tenant-main",
display_name=None,
inbound_sink=sink,
secrets={"appId": "app", "appSecret": "secret"},
config={"requireMentionInGroups": True, "botOpenId": "ou_bot"},
client=FakeFeishuClient(),
)
await adapter.handle_event_payload(
{
"event": {
"message": {
"message_id": "m1",
"chat_id": "oc_group",
"chat_type": "group",
"message_type": "text",
"content": "{\"text\":\"hello\"}",
"mentions": [],
},
"sender": {"sender_id": {"open_id": "ou_user"}},
}
}
)
await adapter.handle_event_payload(
{
"event": {
"message": {
"message_id": "m2",
"chat_id": "oc_group",
"chat_type": "group",
"message_type": "text",
"content": "{\"text\":\"hello\"}",
"mentions": [{"id": {"open_id": "ou_bot"}}],
},
"sender": {"sender_id": {"open_id": "ou_user"}},
}
}
)
assert len(sink.messages) == 1
asyncio.run(run())
def test_feishu_sends_text_to_chat_id() -> None:
async def run() -> None:
sink = FakeSink()
client = FakeFeishuClient()
adapter = FeishuAdapter(
channel_id="feishu-main",
kind="feishu",
mode="websocket",
account_id="tenant-main",
display_name=None,
inbound_sink=sink,
secrets={"appId": "app", "appSecret": "secret"},
config={},
client=client,
)
await adapter.handle_event_payload(
{
"event": {
"message": {
"message_id": "m1",
"chat_id": "oc_chat",
"chat_type": "p2p",
"message_type": "text",
"content": "{\"text\":\"hello\"}",
},
"sender": {"sender_id": {"open_id": "ou_user"}},
}
}
)
await adapter.send(
OutboundMessage(
channel="feishu-main",
content="ok",
session_id=sink.messages[0].session_id,
finish_reason="stop",
channel_identity=sink.messages[0].channel_identity,
)
)
assert client.sent == [
{"receive_id_type": "chat_id", "receive_id": "oc_chat", "text": "ok"}
]
asyncio.run(run())
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_feishu_channel_adapter.py -q
Expected first result:
ModuleNotFoundError
-
Implement
FeishuAdapter:- Parse
event.message.contentas JSON when possible. message_type="text"readscontent["text"].- Non-text message types use
compact_media_summary(message_type, file_name=...). chat_type="p2p"maps topeer_type="dm".- Any other chat type maps to
peer_type="group". sender.sender_id.open_idbecomesuser_id.- Mention gate checks
message.mentions[*].id.open_idagainstconfig.botOpenId. send()callsclient.send_text(receive_id_type="chat_id", receive_id=target.peer_id, text=chunk).- Live client creation is isolated in
_build_client()and importslark_oapionly inside that method.
- Parse
-
Run:
cd app-instance/backend
uv run pytest tests/unit/test_feishu_channel_adapter.py -q
Expected result:
3 passed
Task 5: QQBotAdapter
Files:
- Create
app-instance/backend/beaver/interfaces/channels/platforms/qqbot.py - Create
app-instance/backend/tests/unit/test_qqbot_channel_adapter.py
Steps
- Write tests:
import asyncio
from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter
class FakeSink:
def __init__(self) -> None:
self.messages = []
async def accept_inbound(self, message):
self.messages.append(message)
class FakeQQBotClient:
def __init__(self) -> None:
self.sent = []
async def send_text(self, *, peer_type: str, peer_id: str, content: str, message_id: str | None):
self.sent.append(
{
"peer_type": peer_type,
"peer_id": peer_id,
"content": content,
"message_id": message_id,
}
)
def test_qqbot_normalizes_private_c2c_message() -> None:
async def run() -> None:
sink = FakeSink()
adapter = QQBotAdapter(
channel_id="qq-main",
kind="qqbot",
mode="websocket",
account_id="qq-bot",
display_name=None,
inbound_sink=sink,
secrets={"appId": "app", "clientSecret": "secret"},
config={},
client=FakeQQBotClient(),
)
await adapter.handle_event_payload(
{
"t": "C2C_MESSAGE_CREATE",
"d": {
"id": "m1",
"author": {"user_openid": "u1"},
"content": "hello",
},
}
)
message = sink.messages[0]
assert message.content == "hello"
assert message.session_id == "qq-main:qq-bot:u1"
assert message.channel_identity.peer_type == "dm"
assert message.channel_identity.user_id == "u1"
asyncio.run(run())
def test_qqbot_normalizes_group_message() -> None:
async def run() -> None:
sink = FakeSink()
adapter = QQBotAdapter(
channel_id="qq-main",
kind="qqbot",
mode="websocket",
account_id="qq-bot",
display_name=None,
inbound_sink=sink,
secrets={"appId": "app", "clientSecret": "secret"},
config={},
client=FakeQQBotClient(),
)
await adapter.handle_event_payload(
{
"t": "GROUP_AT_MESSAGE_CREATE",
"d": {
"id": "m2",
"group_openid": "g1",
"author": {"member_openid": "u1"},
"content": "hello group",
},
}
)
message = sink.messages[0]
assert message.session_id == "qq-main:qq-bot:g1"
assert message.channel_identity.peer_type == "group"
assert message.channel_identity.user_id == "u1"
asyncio.run(run())
def test_qqbot_sends_reply_with_original_message_id() -> None:
async def run() -> None:
sink = FakeSink()
client = FakeQQBotClient()
adapter = QQBotAdapter(
channel_id="qq-main",
kind="qqbot",
mode="websocket",
account_id="qq-bot",
display_name=None,
inbound_sink=sink,
secrets={"appId": "app", "clientSecret": "secret"},
config={},
client=client,
)
await adapter.handle_event_payload(
{
"t": "GROUP_AT_MESSAGE_CREATE",
"d": {
"id": "m2",
"group_openid": "g1",
"author": {"member_openid": "u1"},
"content": "hello group",
},
}
)
await adapter.send(
OutboundMessage(
channel="qq-main",
content="ok",
session_id=sink.messages[0].session_id,
finish_reason="stop",
channel_identity=sink.messages[0].channel_identity,
)
)
assert client.sent[0] == {
"peer_type": "group",
"peer_id": "g1",
"content": "ok",
"message_id": "m2",
}
asyncio.run(run())
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_qqbot_channel_adapter.py -q
Expected first result:
ModuleNotFoundError
-
Implement
QQBotAdapter:C2C_MESSAGE_CREATE:peer_id=author.user_openid,peer_type="dm".GROUP_AT_MESSAGE_CREATE:peer_id=group_openid,peer_type="group",user_id=author.member_openid.- Guild/channel events with
guild_idandchannel_id:peer_id=channel_id,thread_id=guild_id,peer_type="channel". - Attachment arrays become compact media summaries and metadata entries.
- Apply
dmPolicy,groupPolicy,allowFrom, andgroupAllowFrombefore accepting. send()calls the injected clientsend_text().- Live websocket and REST client setup live behind methods that import optional packages only on start.
-
Run:
cd app-instance/backend
uv run pytest tests/unit/test_qqbot_channel_adapter.py -q
Expected result:
3 passed
Task 6: WeixinAdapter
Files:
- Create
app-instance/backend/beaver/interfaces/channels/platforms/weixin.py - Create
app-instance/backend/tests/unit/test_weixin_channel_adapter.py
Steps
- Write tests:
import asyncio
from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.weixin import WeixinAdapter
class FakeSink:
def __init__(self) -> None:
self.messages = []
async def accept_inbound(self, message):
self.messages.append(message)
class FakeWeixinClient:
def __init__(self) -> None:
self.sent = []
async def send_text(self, *, peer_id: str, text: str, context_token: str | None):
self.sent.append({"peer_id": peer_id, "text": text, "context_token": context_token})
def test_weixin_normalizes_direct_text_message() -> None:
async def run() -> None:
sink = FakeSink()
adapter = WeixinAdapter(
channel_id="weixin-main",
kind="weixin",
mode="polling",
account_id="wx-main",
display_name=None,
inbound_sink=sink,
secrets={"token": "token"},
config={},
client=FakeWeixinClient(),
)
await adapter.handle_message_payload(
{
"id": "m1",
"from": "wx_user",
"room_id": "",
"type": "text",
"text": "hello",
"context_token": "ctx1",
}
)
message = sink.messages[0]
assert message.content == "hello"
assert message.session_id == "weixin-main:wx-main:wx_user"
assert message.channel_identity.peer_type == "dm"
assert message.metadata["context_token"] == "ctx1"
asyncio.run(run())
def test_weixin_group_message_is_best_effort() -> None:
async def run() -> None:
sink = FakeSink()
adapter = WeixinAdapter(
channel_id="weixin-main",
kind="weixin",
mode="polling",
account_id="wx-main",
display_name=None,
inbound_sink=sink,
secrets={"token": "token"},
config={"groupPolicy": "open"},
client=FakeWeixinClient(),
)
await adapter.handle_message_payload(
{
"id": "m2",
"from": "wx_user",
"room_id": "room1",
"type": "text",
"text": "hello room",
"context_token": "ctx2",
}
)
message = sink.messages[0]
assert message.session_id == "weixin-main:wx-main:room1"
assert message.channel_identity.peer_type == "group"
assert message.channel_identity.user_id == "wx_user"
asyncio.run(run())
def test_weixin_sends_text_with_context_token() -> None:
async def run() -> None:
sink = FakeSink()
client = FakeWeixinClient()
adapter = WeixinAdapter(
channel_id="weixin-main",
kind="weixin",
mode="polling",
account_id="wx-main",
display_name=None,
inbound_sink=sink,
secrets={"token": "token"},
config={},
client=client,
)
await adapter.handle_message_payload(
{
"id": "m1",
"from": "wx_user",
"type": "text",
"text": "hello",
"context_token": "ctx1",
}
)
await adapter.send(
OutboundMessage(
channel="weixin-main",
content="ok",
session_id=sink.messages[0].session_id,
finish_reason="stop",
channel_identity=sink.messages[0].channel_identity,
metadata={"inbound_metadata": sink.messages[0].metadata},
)
)
assert client.sent == [{"peer_id": "wx_user", "text": "ok", "context_token": "ctx1"}]
asyncio.run(run())
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_weixin_channel_adapter.py -q
Expected first result:
ModuleNotFoundError
-
Implement
WeixinAdapter:- Direct messages use
peer_id=from,peer_type="dm". - Group messages use
peer_id=room_id,peer_type="group",user_id=from. - Default
dmPolicyisopen. - Default
groupPolicyisdisabled. - Preserve
context_tokenin inbound metadata. send()readscontext_tokenfrommessage.metadata["inbound_metadata"]when available.- Live polling setup uses configured
baseUrl,cdnBaseUrl, andtoken. - QR credential loading is represented as explicit config/state read inside the live client builder.
- Direct messages use
-
Run:
cd app-instance/backend
uv run pytest tests/unit/test_weixin_channel_adapter.py -q
Expected result:
3 passed
Task 7: Import Guards and Optional Extras
Files:
- Modify
app-instance/backend/pyproject.toml - Modify
app-instance/backend/tests/unit/test_imports.py
Steps
-
Update
pyproject.tomloptional extras exactly as described in Dependency Strategy. -
Add import tests:
def test_platform_channel_modules_import_without_live_clients() -> None:
from beaver.interfaces.channels.platforms.feishu import FeishuAdapter
from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter
from beaver.interfaces.channels.platforms.telegram import TelegramAdapter
from beaver.interfaces.channels.platforms.weixin import WeixinAdapter
assert FeishuAdapter.KIND == "feishu"
assert QQBotAdapter.KIND == "qqbot"
assert TelegramAdapter.KIND == "telegram"
assert WeixinAdapter.KIND == "weixin"
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_imports.py -q
Expected result:
passed
Task 8: Runtime Startup Isolation
Files:
- Modify
app-instance/backend/tests/unit/test_channel_runtime.py - Modify adapter files as needed
Steps
- Add a startup isolation test:
def test_channel_runtime_platform_start_failure_does_not_stop_other_channels(tmp_path) -> None:
async def run() -> None:
runtime = ChannelRuntime(
service=FakeAgentService(),
workspace=tmp_path,
channels={
"telegram-main": ChannelConfig(
enabled=True,
kind="telegram",
mode="polling",
account_id="bot-main",
secrets={},
),
"off": ChannelConfig(
enabled=False,
kind="weixin",
mode="polling",
account_id="wx-main",
),
},
)
await runtime.start()
try:
by_id = {item["channel_id"]: item for item in runtime.statuses()}
finally:
await runtime.stop()
assert by_id["telegram-main"]["state"] == "error"
assert "botToken" in by_id["telegram-main"]["last_error"]
assert by_id["off"]["state"] == "disabled"
asyncio.run(run())
- Ensure each live adapter builder validates required secrets only when
start()needs a live client:
def _require_secret(self, key: str) -> str:
value = self.secrets.get(key)
if not value:
raise ValueError(f"{key} is required")
return str(value)
- Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q
Expected result:
passed
Task 9: Cross-Channel Verification
Files: no new files unless a previous task reveals a test gap.
Steps
- Run all focused channel tests:
cd app-instance/backend
uv run pytest \
tests/unit/test_platform_channel_helpers.py \
tests/unit/test_telegram_channel_adapter.py \
tests/unit/test_feishu_channel_adapter.py \
tests/unit/test_qqbot_channel_adapter.py \
tests/unit/test_weixin_channel_adapter.py \
tests/unit/test_channel_runtime.py \
tests/unit/test_imports.py \
-q
Expected result:
passed
- Run the broader backend unit suite:
cd app-instance/backend
uv run pytest tests/unit -q
Expected result:
passed
- Inspect changed files:
git status --short
git diff -- app-instance/backend/beaver/interfaces/channels app-instance/backend/tests/unit app-instance/backend/pyproject.toml
-
Confirm these invariants manually from the diff:
ChannelRuntimestill ownsMessageBusand agent dispatch.- Platform adapters call only
ChannelInboundSink.accept_inbound()for inbound routing. - Platform adapters do not call
AgentService. - Optional platform packages are imported only inside live client or live transport builders.
- Tests use fake clients and fake payloads.
- No adapter sends when the outbound target cannot be resolved; it marks
delivery_status="unclaimed"instead.
Implementation Order
- Task 1: Shared Platform Helpers
- Task 3: TelegramAdapter
- Task 4: FeishuAdapter
- Task 5: QQBotAdapter
- Task 6: WeixinAdapter
- Task 2: Runtime Factory and Status
- Task 7: Import Guards and Optional Extras
- Task 8: Runtime Startup Isolation
- Task 9: Cross-Channel Verification
This order proves the shared adapter pattern with Telegram first, then adds the two official bot-style channels, leaves Weixin adapter behavior last because context tokens and login state make it the most specialized, and only then connects the concrete modules to the runtime factory.