Files
beaver_project/app-instance/backend/tests/unit/test_weixin_channel_adapter.py

130 lines
3.7 KiB
Python

import asyncio
from beaver.foundation.events import OutboundMessage
from beaver.interfaces.channels.platforms.weixin import WeixinAdapter
class FakeSink:
def __init__(self) -> None:
self.messages = []
async def accept_inbound(self, message):
self.messages.append(message)
class FakeWeixinClient:
def __init__(self) -> None:
self.sent = []
async def send_text(self, *, peer_id: str, text: str, context_token: str | None):
self.sent.append({"peer_id": peer_id, "text": text, "context_token": context_token})
def test_weixin_normalizes_direct_text_message() -> None:
async def run() -> None:
sink = FakeSink()
adapter = WeixinAdapter(
channel_id="weixin-main",
kind="weixin",
mode="polling",
account_id="wx-main",
display_name=None,
inbound_sink=sink,
secrets={"token": "token"},
config={},
client=FakeWeixinClient(),
)
await adapter.handle_message_payload(
{
"id": "m1",
"from": "wx_user",
"room_id": "",
"type": "text",
"text": "hello",
"context_token": "ctx1",
}
)
message = sink.messages[0]
assert message.content == "hello"
assert message.session_id == "weixin-main:wx-main:wx_user"
assert message.channel_identity.peer_type == "dm"
assert message.metadata["context_token"] == "ctx1"
asyncio.run(run())
def test_weixin_group_message_is_best_effort() -> None:
async def run() -> None:
sink = FakeSink()
adapter = WeixinAdapter(
channel_id="weixin-main",
kind="weixin",
mode="polling",
account_id="wx-main",
display_name=None,
inbound_sink=sink,
secrets={"token": "token"},
config={"groupPolicy": "open"},
client=FakeWeixinClient(),
)
await adapter.handle_message_payload(
{
"id": "m2",
"from": "wx_user",
"room_id": "room1",
"type": "text",
"text": "hello room",
"context_token": "ctx2",
}
)
message = sink.messages[0]
assert message.session_id == "weixin-main:wx-main:room1"
assert message.channel_identity.peer_type == "group"
assert message.channel_identity.user_id == "wx_user"
asyncio.run(run())
def test_weixin_sends_text_with_context_token() -> None:
async def run() -> None:
sink = FakeSink()
client = FakeWeixinClient()
adapter = WeixinAdapter(
channel_id="weixin-main",
kind="weixin",
mode="polling",
account_id="wx-main",
display_name=None,
inbound_sink=sink,
secrets={"token": "token"},
config={},
client=client,
)
await adapter.handle_message_payload(
{
"id": "m1",
"from": "wx_user",
"type": "text",
"text": "hello",
"context_token": "ctx1",
}
)
await adapter.send(
OutboundMessage(
channel="weixin-main",
content="ok",
session_id=sink.messages[0].session_id,
finish_reason="stop",
channel_identity=sink.messages[0].channel_identity,
metadata={"inbound_metadata": sink.messages[0].metadata},
)
)
assert client.sent == [{"peer_id": "wx_user", "text": "ok", "context_token": "ctx1"}]
asyncio.run(run())