import asyncio from beaver.foundation.events import OutboundMessage from beaver.interfaces.channels.platforms.weixin import WeixinAdapter class FakeSink: def __init__(self) -> None: self.messages = [] async def accept_inbound(self, message): self.messages.append(message) class FakeWeixinClient: def __init__(self) -> None: self.sent = [] async def send_text(self, *, peer_id: str, text: str, context_token: str | None): self.sent.append({"peer_id": peer_id, "text": text, "context_token": context_token}) def test_weixin_normalizes_direct_text_message() -> None: async def run() -> None: sink = FakeSink() adapter = WeixinAdapter( channel_id="weixin-main", kind="weixin", mode="polling", account_id="wx-main", display_name=None, inbound_sink=sink, secrets={"token": "token"}, config={}, client=FakeWeixinClient(), ) await adapter.handle_message_payload( { "id": "m1", "from": "wx_user", "room_id": "", "type": "text", "text": "hello", "context_token": "ctx1", } ) message = sink.messages[0] assert message.content == "hello" assert message.session_id == "weixin-main:wx-main:wx_user" assert message.channel_identity.peer_type == "dm" assert message.metadata["context_token"] == "ctx1" asyncio.run(run()) def test_weixin_group_message_is_best_effort() -> None: async def run() -> None: sink = FakeSink() adapter = WeixinAdapter( channel_id="weixin-main", kind="weixin", mode="polling", account_id="wx-main", display_name=None, inbound_sink=sink, secrets={"token": "token"}, config={"groupPolicy": "open"}, client=FakeWeixinClient(), ) await adapter.handle_message_payload( { "id": "m2", "from": "wx_user", "room_id": "room1", "type": "text", "text": "hello room", "context_token": "ctx2", } ) message = sink.messages[0] assert message.session_id == "weixin-main:wx-main:room1" assert message.channel_identity.peer_type == "group" assert message.channel_identity.user_id == "wx_user" asyncio.run(run()) def test_weixin_sends_text_with_context_token() -> None: async def run() -> None: sink = FakeSink() client = FakeWeixinClient() adapter = WeixinAdapter( channel_id="weixin-main", kind="weixin", mode="polling", account_id="wx-main", display_name=None, inbound_sink=sink, secrets={"token": "token"}, config={}, client=client, ) await adapter.handle_message_payload( { "id": "m1", "from": "wx_user", "type": "text", "text": "hello", "context_token": "ctx1", } ) await adapter.send( OutboundMessage( channel="weixin-main", content="ok", session_id=sink.messages[0].session_id, finish_reason="stop", channel_identity=sink.messages[0].channel_identity, metadata={"inbound_metadata": sink.messages[0].metadata}, ) ) assert client.sent == [{"peer_id": "wx_user", "text": "ok", "context_token": "ctx1"}] asyncio.run(run())