import asyncio from beaver.foundation.events import OutboundMessage from beaver.interfaces.channels.platforms.telegram import TelegramAdapter class FakeSink: def __init__(self) -> None: self.messages = [] async def accept_inbound(self, message): self.messages.append(message) class FakeTelegramClient: def __init__(self) -> None: self.sent = [] async def send_message(self, **kwargs): self.sent.append(kwargs) def test_telegram_normalizes_private_text_message() -> None: async def run() -> None: sink = FakeSink() adapter = TelegramAdapter( channel_id="telegram-main", kind="telegram", mode="polling", account_id="bot-main", display_name=None, inbound_sink=sink, secrets={"botToken": "x"}, config={}, client=FakeTelegramClient(), ) await adapter.handle_update_payload( { "message": { "message_id": 100, "text": "hello", "chat": {"id": 200, "type": "private"}, "from": {"id": 300, "username": "ivan"}, } } ) message = sink.messages[0] assert message.channel == "telegram-main" assert message.content == "hello" assert message.session_id == "telegram-main:bot-main:200" assert message.channel_identity.peer_type == "dm" assert message.channel_identity.user_id == "300" assert message.channel_identity.message_id == "100" asyncio.run(run()) def test_telegram_group_requires_mention_when_configured() -> None: async def run() -> None: sink = FakeSink() adapter = TelegramAdapter( channel_id="telegram-main", kind="telegram", mode="polling", account_id="bot-main", display_name=None, inbound_sink=sink, secrets={"botToken": "x"}, config={"requireMentionInGroups": True, "botUsername": "beaver_bot"}, client=FakeTelegramClient(), ) await adapter.handle_update_payload( { "message": { "message_id": 101, "text": "hello group", "chat": {"id": -20, "type": "group"}, "from": {"id": 300}, } } ) await adapter.handle_update_payload( { "message": { "message_id": 102, "text": "@beaver_bot hello", "chat": {"id": -20, "type": "group"}, "from": {"id": 300}, } } ) assert len(sink.messages) == 1 assert sink.messages[0].content == "hello" asyncio.run(run()) def test_telegram_sends_chunked_reply_to_identity_target() -> None: async def run() -> None: sink = FakeSink() client = FakeTelegramClient() adapter = TelegramAdapter( channel_id="telegram-main", kind="telegram", mode="polling", account_id="bot-main", display_name=None, inbound_sink=sink, secrets={"botToken": "x"}, config={"maxMessageChars": 3}, client=client, ) await adapter.handle_update_payload( { "message": { "message_id": 100, "text": "hello", "chat": {"id": 200, "type": "private"}, "from": {"id": 300}, } } ) await adapter.send( OutboundMessage( channel="telegram-main", content="abcdef", session_id=sink.messages[0].session_id, finish_reason="stop", channel_identity=sink.messages[0].channel_identity, ) ) assert [item["text"] for item in client.sent] == ["abc", "def"] assert client.sent[0]["chat_id"] == "200" asyncio.run(run())