from __future__ import annotations import base64 import hashlib import json import random import threading import time from collections.abc import Callable from typing import Any from urllib.parse import quote from uuid import uuid4 import httpx import qrcode import qrcode.image.svg from external_connector.providers.fake import _session_view from external_connector.state import ConnectorSessionState, SidecarStateStore BridgePoster = Callable[[str, dict[str, Any], dict[str, str]], None] class WeixinIlinkProvider: provider_id = "weixin_ilink" fixed_base_url = "https://ilinkai.weixin.qq.com" app_client_version = str((2 << 16) | (4 << 8) | 3) def __init__( self, *, store: SidecarStateStore, http_client: Any | None = None, bridge_base_url: str, bridge_token: str, bridge_post: BridgePoster | None = None, start_receivers: bool = True, ) -> None: self.store = store self.http = http_client or httpx.Client(timeout=40) self.bridge_base_url = bridge_base_url.rstrip("/") self.bridge_token = bridge_token self.bridge_post = bridge_post or self._default_bridge_post self.start_receivers = start_receivers self._receiver_stops: dict[str, threading.Event] = {} self._receiver_lock = threading.Lock() self._start_existing_connected_receivers() def connectors(self) -> list[dict[str, Any]]: return [ { "kind": "weixin", "displayName": "Weixin", "authType": "qr", "providerId": self.provider_id, "capabilities": ["receive_text", "send_text", "direct_messages"], } ] def health(self) -> dict[str, Any]: return {"ok": True, "providerId": self.provider_id} def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: kind = str(payload["kind"]) if kind != "weixin": raise KeyError(f"Unsupported connector kind: {kind}") session = self.store.create_session( kind=kind, connection_id=str(payload["connectionId"]), channel_id=str(payload["channelId"]), display_name=str(payload["displayName"]), options=dict(payload.get("options") or {}), ) response = self._post_json( self.fixed_base_url, "ilink/bot/get_bot_qrcode?bot_type=3", {"local_token_list": []}, token=None, timeout=20, ) qr_code = str(response.get("qrcode") or "") qr_url = str(response.get("qrcode_img_content") or "") session = self.store.update_session( session.session_id, status="qr_ready", qr_code=qr_url, qr_image=_qr_svg_data_uri(qr_url), metadata={ "qrcode": qr_code, "qrBaseUrl": self.fixed_base_url, "bridgeBaseUrl": str(payload.get("callbackBaseUrl") or self.bridge_base_url), "getUpdatesBuf": "", }, ) return _session_view(session) def get_session(self, session_id: str) -> dict[str, Any]: session = self.store.get_session(session_id) if session.kind != "weixin": raise KeyError(session_id) if session.kind == "weixin" and session.status not in {"expired", "error", "cancelled"} and _has_connection_material(session): if session.status != "connected": session = self.store.update_session(session.session_id, status="connected") self._ensure_receiver(session) return _session_view(session) if session.status in {"connected", "expired", "error", "cancelled"}: if session.status == "connected": self._ensure_receiver(session) return _session_view(session) qrcode = str(session.metadata.get("qrcode") or "") if not qrcode: return _session_view(session) endpoint = f"ilink/bot/get_qrcode_status?qrcode={quote(qrcode)}" response = self._get_json(self.fixed_base_url, endpoint, timeout=40) status = str(response.get("status") or "wait") session = self._apply_login_status(session, response, status) if session.status == "connected": self._ensure_receiver(session) return _session_view(session) def cancel_session(self, session_id: str) -> None: session = self.store.get_session(session_id) if session.kind != "weixin": raise KeyError(session_id) session = self.store.update_session(session_id, status="cancelled") self._stop_receiver(session.connection_id) def logout(self, connection_id: str) -> None: self._stop_receiver(connection_id) try: session = self.store.find_session_by_connection_id(connection_id) except KeyError: return None self.store.update_session(session.session_id, status="cancelled") return None def send(self, payload: dict[str, Any]) -> dict[str, Any]: begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"])) if not begin.should_send: if begin.http_status == 409: return {"ok": False, "status": begin.status, "retryAfterSeconds": begin.retry_after_seconds, "httpStatus": 409} return {"ok": True, "providerMessageId": begin.provider_message_id} session = self.store.find_session_by_connection_id(str(payload["connectionId"])) token = str(session.metadata.get("token") or "") base_url = str(session.metadata.get("baseUrl") or "") if not token or not base_url: return {"ok": False, "error": "Weixin connection is not connected"} target = dict(payload.get("target") or {}) metadata = dict(payload.get("metadata") or {}) peer_id = str(target.get("peerId") or "") request_id = str(payload["requestId"]) client_id = _client_id(request_id) message_body = { "from_user_id": "", "to_user_id": peer_id, "client_id": client_id, "message_type": 2, "message_state": 2, "item_list": [{"type": 1, "text_item": {"text": str(payload.get("content") or "")}}], } context_token = _optional_text(metadata.get("contextToken") or metadata.get("context_token")) if not context_token: context_token = _cached_context_token(session, peer_id) if context_token: message_body["context_token"] = context_token print( json.dumps( { "event": "weixin_send_attempt", "connectionId": payload["connectionId"], "peerId": peer_id, "requestId": request_id, "clientId": client_id, "hasContextToken": bool(context_token), "textLength": len(str(payload.get("content") or "")), }, ensure_ascii=False, ), flush=True, ) try: response = self._post_json( base_url, "ilink/bot/sendmessage", {"msg": message_body}, token=token, timeout=20, ) except Exception as exc: error = str(exc) self.store.fail_send(begin.dedupe_key, error=error) return {"ok": False, "error": error, "httpStatus": 502} error = _business_error(response) if error: self.store.fail_send(begin.dedupe_key, error=error) return {"ok": False, "error": error, "httpStatus": 502} print( json.dumps( { "event": "weixin_send_result", "connectionId": payload["connectionId"], "requestId": request_id, "responseKeys": sorted(str(key) for key in response.keys()), "businessError": None, }, ensure_ascii=False, ), flush=True, ) provider_message_id = f"weixin_{payload['requestId']}" self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id) return {"ok": True, "providerMessageId": provider_message_id} def poll_once(self, connection_id: str) -> None: session = self.store.find_session_by_connection_id(connection_id) token = str(session.metadata.get("token") or "") base_url = str(session.metadata.get("baseUrl") or "") if not token or not base_url: return None metadata = dict(session.metadata) response = self._post_json( base_url, "ilink/bot/getupdates", {"get_updates_buf": str(metadata.get("getUpdatesBuf") or "")}, token=token, timeout=40, ) metadata["getUpdatesBuf"] = str(response.get("get_updates_buf") or metadata.get("getUpdatesBuf") or "") for message in response.get("msgs") or []: if isinstance(message, dict): _remember_context_token(metadata, message) event = _bridge_event_from_weixin_message(session, message) print( json.dumps( { "event": "weixin_inbound_message", "connectionId": session.connection_id, "messageId": event["messageId"], "peerId": event["peerId"], "hasContextToken": bool(event["metadata"].get("contextToken")), "textLength": len(event["content"]), }, ensure_ascii=False, ), flush=True, ) self.bridge_post( f"{_bridge_base_url(session, self.bridge_base_url)}/api/channel-connector-bridge/events", event, {"Authorization": f"Bearer {self.bridge_token}"}, ) self.store.update_session(session.session_id, metadata=metadata) return None def _apply_login_status( self, session: ConnectorSessionState, response: dict[str, Any], status: str, ) -> ConnectorSessionState: if status == "wait": return self.store.update_session(session.session_id, status="qr_ready") if status == "scaned": return self.store.update_session(session.session_id, status="scanned") if status == "expired": return self.store.update_session(session.session_id, status="expired", error="QR code expired") if status == "confirmed": token = str(response.get("bot_token") or "") account_id = str(response.get("ilink_bot_id") or "") base_url = str(response.get("baseurl") or self.fixed_base_url) if not token or not account_id: return self.store.update_session(session.session_id, status="confirmed") metadata = dict(session.metadata) metadata.update( { "token": token, "baseUrl": base_url, "userId": str(response.get("ilink_user_id") or ""), "getUpdatesBuf": str(metadata.get("getUpdatesBuf") or ""), } ) return self.store.update_session( session.session_id, status="connected", account_id=f"weixin:{account_id}", metadata=metadata, ) if status == "need_verifycode": return self.store.update_session( session.session_id, status="waiting_for_user", instructions=["Enter the verification code shown in Weixin, then refresh status."], ) return self.store.update_session(session.session_id, status="waiting_for_user") def _ensure_receiver(self, session: ConnectorSessionState) -> None: if not self.start_receivers: return None with self._receiver_lock: if session.connection_id in self._receiver_stops: return None stop = threading.Event() self._receiver_stops[session.connection_id] = stop thread = threading.Thread(target=self._receiver_loop, args=(session.connection_id, stop), daemon=True) thread.start() print( json.dumps( { "event": "weixin_receiver_started", "connectionId": session.connection_id, "channelId": session.channel_id, }, ensure_ascii=False, ), flush=True, ) return None def _start_existing_connected_receivers(self) -> None: if not self.start_receivers: return None for session in self.store.list_sessions(): if session.kind != "weixin" or session.status != "connected": continue if _has_connection_material(session): self._ensure_receiver(session) return None def _receiver_loop(self, connection_id: str, stop: threading.Event) -> None: while not stop.is_set(): try: self.poll_once(connection_id) except Exception as exc: print( json.dumps( { "event": "weixin_receiver_error", "connectionId": connection_id, "error": str(exc)[:300], }, ensure_ascii=False, ), flush=True, ) time.sleep(5) stop.wait(1) def _stop_receiver(self, connection_id: str) -> None: with self._receiver_lock: stop = self._receiver_stops.pop(connection_id, None) if stop is not None: stop.set() def _post_json( self, base_url: str, endpoint: str, body: dict[str, Any], *, token: str | None, timeout: float, ) -> dict[str, Any]: response = self.http.post( _url(base_url, endpoint), json={**body, "base_info": _base_info()}, headers=_headers(token), timeout=timeout, ) response.raise_for_status() return dict(response.json()) def _get_json(self, base_url: str, endpoint: str, *, timeout: float) -> dict[str, Any]: response = self.http.get(_url(base_url, endpoint), headers=_common_headers(), timeout=timeout) response.raise_for_status() return dict(response.json()) def _default_bridge_post(self, url: str, payload: dict[str, Any], headers: dict[str, str]) -> None: response = self.http.post(url, json=payload, headers=headers, timeout=20) response.raise_for_status() def _url(base_url: str, endpoint: str) -> str: return f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" def _bridge_base_url(session: ConnectorSessionState, fallback: str) -> str: return str(session.metadata.get("bridgeBaseUrl") or fallback).rstrip("/") def _base_info() -> dict[str, str]: return {"channel_version": "2.4.3", "bot_agent": "Beaver/1.0"} def _common_headers() -> dict[str, str]: return {"iLink-App-Id": "bot", "iLink-App-ClientVersion": WeixinIlinkProvider.app_client_version} def _headers(token: str | None) -> dict[str, str]: headers = { **_common_headers(), "Content-Type": "application/json", "AuthorizationType": "ilink_bot_token", "X-WECHAT-UIN": base64.b64encode(str(random.getrandbits(32)).encode("utf-8")).decode("ascii"), } if token: headers["Authorization"] = f"Bearer {token}" return headers def _qr_svg_data_uri(value: str) -> str: image = qrcode.make(value, image_factory=qrcode.image.svg.SvgPathImage) raw = image.to_string(encoding="unicode") return "data:image/svg+xml;base64," + base64.b64encode(raw.encode("utf-8")).decode("ascii") def _bridge_event_from_weixin_message(session: ConnectorSessionState, message: dict[str, Any]) -> dict[str, Any]: message_id = str(message.get("message_id") or uuid4().hex) peer_id = str(message.get("from_user_id") or "") text = _extract_text(message) return { "eventId": f"{session.channel_id}:{message_id}", "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "deliveryAttempt": 1, "connectionId": session.connection_id, "channelId": session.channel_id, "kind": "weixin", "accountId": session.account_id, "peerId": peer_id, "peerType": "dm", "userId": peer_id, "threadId": None, "messageId": message_id, "messageType": "text", "content": text, "metadata": {"contextToken": message.get("context_token")}, } def _remember_context_token(metadata: dict[str, Any], message: dict[str, Any]) -> None: peer_id = _optional_text(message.get("from_user_id")) context_token = _optional_text(message.get("context_token")) if not peer_id or not context_token: return None context_tokens = metadata.get("contextTokens") if not isinstance(context_tokens, dict): context_tokens = {} context_tokens[peer_id] = context_token metadata["contextTokens"] = context_tokens return None def _extract_text(message: dict[str, Any]) -> str: for item in message.get("item_list") or []: if not isinstance(item, dict): continue text_item = item.get("text_item") if isinstance(text_item, dict) and text_item.get("text") is not None: return str(text_item["text"]) return "" def _business_error(response: dict[str, Any]) -> str | None: for key in ("ret", "code", "errcode"): if key not in response: continue try: value = int(response.get(key) or 0) except (TypeError, ValueError): value = -1 if value == 0: return None message = response.get("errmsg") or response.get("msg") or response.get("error") or response return str(message) return None def _has_connection_material(session: ConnectorSessionState) -> bool: metadata = dict(session.metadata) return bool(str(metadata.get("token") or "").strip() and str(metadata.get("baseUrl") or "").strip() and session.account_id) def _cached_context_token(session: ConnectorSessionState, peer_id: str) -> str | None: context_tokens = dict(session.metadata.get("contextTokens") or {}) return _optional_text(context_tokens.get(peer_id)) def _client_id(request_id: str) -> str: text = str(request_id).strip() if text and len(text) <= 64 and all(char.isalnum() or char in {"_", "-"} for char in text): return text digest = hashlib.sha256(text.encode("utf-8")).hexdigest()[:24] return f"beaver-weixin-{digest}" def _optional_text(value: Any) -> str | None: if value is None: return None text = str(value).strip() return text or None