from __future__ import annotations import json from external_connector.providers.weixin_ilink import WeixinIlinkProvider from external_connector.state import SidecarStateStore class FakeResponse: def __init__(self, payload: dict[str, object]) -> None: self.payload = payload self.text = json.dumps(payload) self.status_code = 200 self.is_success = True def raise_for_status(self) -> None: return None def json(self) -> dict[str, object]: return self.payload class FakeHttpClient: def __init__(self) -> None: self.posts: list[tuple[str, dict[str, object] | None, dict[str, str] | None]] = [] self.gets: list[str] = [] def post(self, url: str, *, json: dict[str, object] | None = None, headers: dict[str, str] | None = None, timeout: float | None = None) -> FakeResponse: self.posts.append((url, json, headers)) if "get_bot_qrcode" in url: return FakeResponse({"qrcode": "qr-token", "qrcode_img_content": "https://scan.example/qr"}) if "sendmessage" in url: return FakeResponse({"ret": 0}) if "getupdates" in url: return FakeResponse( { "ret": 0, "get_updates_buf": "next-buf", "msgs": [ { "message_id": 42, "from_user_id": "wx-user", "to_user_id": "wx-bot", "context_token": "ctx-1", "item_list": [{"type": 1, "text_item": {"text": "hello"}}], } ], } ) raise AssertionError(url) def get(self, url: str, *, headers: dict[str, str] | None = None, timeout: float | None = None) -> FakeResponse: self.gets.append(url) return FakeResponse( { "status": "confirmed", "bot_token": "bot-token", "ilink_bot_id": "bot-1@im.bot", "baseurl": "https://api.weixin.example", "ilink_user_id": "wx-owner", } ) class BusinessFailingSendHttpClient(FakeHttpClient): def post(self, url: str, *, json: dict[str, object] | None = None, headers: dict[str, str] | None = None, timeout: float | None = None) -> FakeResponse: self.posts.append((url, json, headers)) if "sendmessage" in url: return FakeResponse({"ret": 47001, "errmsg": "invalid receiver"}) return super().post(url, json=json, headers=headers, timeout=timeout) class ScannedAfterConnectedHttpClient(FakeHttpClient): def __init__(self) -> None: super().__init__() self.get_count = 0 def get(self, url: str, *, headers: dict[str, str] | None = None, timeout: float | None = None) -> FakeResponse: self.gets.append(url) self.get_count += 1 if self.get_count == 1: return FakeResponse( { "status": "confirmed", "bot_token": "bot-token", "ilink_bot_id": "bot-1@im.bot", "baseurl": "https://api.weixin.example", "ilink_user_id": "wx-owner", } ) return FakeResponse({"status": "scaned"}) def test_weixin_ilink_provider_starts_real_qr_session(tmp_path) -> None: http = FakeHttpClient() provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) assert session["status"] == "qr_ready" assert session["qrCode"] == "https://scan.example/qr" assert session["qrImage"].startswith("data:image/svg+xml;base64,") assert http.posts[0][0].endswith("/ilink/bot/get_bot_qrcode?bot_type=3") def test_weixin_ilink_provider_connects_on_confirmed_status(tmp_path) -> None: http = FakeHttpClient() provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) connected = provider.get_session(session["sessionId"]) assert connected["status"] == "connected" assert connected["accountId"] == "weixin:bot-1@im.bot" assert connected["displayName"] == "Weixin Main" def test_weixin_ilink_provider_does_not_downgrade_token_session_to_scanned(tmp_path) -> None: http = ScannedAfterConnectedHttpClient() provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) connected = provider.get_session(session["sessionId"]) refreshed = provider.get_session(session["sessionId"]) assert connected["status"] == "connected" assert refreshed["status"] == "connected" assert refreshed["accountId"] == "weixin:bot-1@im.bot" def test_weixin_ilink_provider_recovers_token_session_persisted_as_scanned(tmp_path) -> None: store = SidecarStateStore(tmp_path / "state.json") provider = WeixinIlinkProvider( store=store, http_client=FakeHttpClient(), bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = store.create_session( kind="weixin", connection_id="conn_1", channel_id="weixin-main", display_name="Weixin Main", options={}, ) session = store.update_session( session.session_id, status="scanned", account_id="weixin:bot-1@im.bot", metadata={ "token": "bot-token", "baseUrl": "https://api.weixin.example", "userId": "wx-owner", "getUpdatesBuf": "buf", }, ) recovered = provider.get_session(session.session_id) assert recovered["status"] == "connected" assert recovered["accountId"] == "weixin:bot-1@im.bot" def test_weixin_ilink_provider_starts_existing_connected_receiver_on_startup(tmp_path) -> None: store = SidecarStateStore(tmp_path / "state.json") session = store.create_session( kind="weixin", connection_id="conn_1", channel_id="weixin-main", display_name="Weixin Main", options={}, ) store.update_session( session.session_id, status="connected", account_id="weixin:bot-1@im.bot", metadata={ "token": "bot-token", "baseUrl": "https://api.weixin.example", "userId": "wx-owner", "getUpdatesBuf": "buf", }, ) provider = WeixinIlinkProvider( store=store, http_client=FakeHttpClient(), bridge_base_url="http://beaver:8080", bridge_token="bridge-token", ) assert "conn_1" in provider._receiver_stops provider.logout("conn_1") def test_weixin_ilink_provider_send_uses_saved_token_and_dedupes(tmp_path) -> None: http = FakeHttpClient() provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) provider.get_session(session["sessionId"]) payload = { "requestId": "out_1", "connectionId": "conn_1", "channelId": "weixin-main", "kind": "weixin", "target": {"peerId": "wx-user", "peerType": "dm", "threadId": None}, "content": "reply", "metadata": {"contextToken": "ctx-1"}, } first = provider.send(payload) second = provider.send(payload) send_posts = [item for item in http.posts if "sendmessage" in item[0]] assert first == second assert first["ok"] is True assert len(send_posts) == 1 assert send_posts[0][2]["Authorization"] == "Bearer bot-token" assert send_posts[0][1]["msg"]["from_user_id"] == "" assert send_posts[0][1]["msg"]["to_user_id"] == "wx-user" assert send_posts[0][1]["msg"]["client_id"] == "out_1" assert send_posts[0][1]["msg"]["message_type"] == 2 assert send_posts[0][1]["msg"]["message_state"] == 2 assert send_posts[0][1]["msg"]["context_token"] == "ctx-1" assert send_posts[0][1]["msg"]["item_list"][0]["text_item"]["text"] == "reply" def test_weixin_ilink_provider_send_uses_cached_context_token(tmp_path) -> None: http = FakeHttpClient() provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", bridge_post=lambda url, payload, headers: None, start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) provider.get_session(session["sessionId"]) provider.poll_once("conn_1") result = provider.send( { "requestId": "out_2", "connectionId": "conn_1", "channelId": "weixin-main", "kind": "weixin", "target": {"peerId": "wx-user", "peerType": "dm", "threadId": None}, "content": "reply", "metadata": {}, } ) send_posts = [item for item in http.posts if "sendmessage" in item[0]] assert result["ok"] is True assert send_posts[-1][1]["msg"]["context_token"] == "ctx-1" def test_weixin_ilink_provider_send_uses_safe_client_id_for_platform(tmp_path) -> None: http = FakeHttpClient() provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) provider.get_session(session["sessionId"]) provider.send( { "requestId": "out_weixin-main:account@im.bot:peer@im.wechat:msg-1", "connectionId": "conn_1", "channelId": "weixin-main", "kind": "weixin", "target": {"peerId": "wx-user", "peerType": "dm", "threadId": None}, "content": "reply", "metadata": {"contextToken": "ctx-1"}, } ) send_posts = [item for item in http.posts if "sendmessage" in item[0]] client_id = send_posts[-1][1]["msg"]["client_id"] assert client_id.startswith("beaver-weixin-") assert ":" not in client_id assert "@" not in client_id def test_weixin_ilink_provider_send_rejects_business_error_without_completing(tmp_path) -> None: http = BusinessFailingSendHttpClient() store = SidecarStateStore(tmp_path / "state.json") provider = WeixinIlinkProvider( store=store, http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) provider.get_session(session["sessionId"]) payload = { "requestId": "out_1", "connectionId": "conn_1", "channelId": "weixin-main", "kind": "weixin", "target": {"peerId": "wx-user", "peerType": "dm", "threadId": None}, "content": "reply", "metadata": {}, } result = provider.send(payload) retry = store.begin_send(connection_id="conn_1", request_id="out_1") assert result["ok"] is False assert "invalid receiver" in result["error"] assert retry.should_send is True def test_weixin_ilink_provider_poll_once_forwards_bridge_event(tmp_path) -> None: http = FakeHttpClient() bridge_posts: list[tuple[str, dict[str, object], dict[str, str]]] = [] def bridge_post(url: str, payload: dict[str, object], headers: dict[str, str]) -> None: bridge_posts.append((url, payload, headers)) provider = WeixinIlinkProvider( store=SidecarStateStore(tmp_path / "state.json"), http_client=http, bridge_base_url="http://beaver:8080", bridge_token="bridge-token", bridge_post=bridge_post, start_receivers=False, ) session = provider.start_session( { "kind": "weixin", "connectionId": "conn_1", "channelId": "weixin-main", "displayName": "Weixin Main", "callbackBaseUrl": "http://beaver:8080", "options": {}, } ) provider.get_session(session["sessionId"]) provider.poll_once("conn_1") assert bridge_posts[0][0] == "http://beaver:8080/api/channel-connector-bridge/events" assert bridge_posts[0][2]["Authorization"] == "Bearer bridge-token" assert bridge_posts[0][1]["eventId"] == "weixin-main:42" assert bridge_posts[0][1]["content"] == "hello" assert bridge_posts[0][1]["peerId"] == "wx-user"