From d335199a64c2fcf75663716d444c0b265599408d Mon Sep 17 00:00:00 2001 From: steven_li Date: Wed, 3 Jun 2026 09:24:06 +0800 Subject: [PATCH] docs: add external connector implementation plans --- ...6-03-external-connector-backend-runtime.md | 1599 +++++++++++++++++ ...6-03-external-connector-frontend-deploy.md | 790 ++++++++ .../2026-06-03-external-connector-sidecar.md | 1167 ++++++++++++ 3 files changed, 3556 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md create mode 100644 docs/superpowers/plans/2026-06-03-external-connector-frontend-deploy.md create mode 100644 docs/superpowers/plans/2026-06-03-external-connector-sidecar.md diff --git a/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md b/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md new file mode 100644 index 0000000..1e818f0 --- /dev/null +++ b/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md @@ -0,0 +1,1599 @@ +# External Connector Backend Runtime Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add Beaver backend support for sidecar-backed Weixin and Feishu connector sessions, bridge-event dedupe, outbound sidecar delivery, and dynamic runtime channel activation. + +**Architecture:** Beaver depends only on a generic connector HTTP contract. Sidecar-backed connections materialize as `ChannelConfig(kind="external_connector", mode="http")`, are registered dynamically through `ChannelRuntime.add_channel()`, and accept inbound events through an authenticated bridge endpoint with `MessageDedupeStore` idempotency. + +**Tech Stack:** Python dataclasses, FastAPI, Pydantic v2, local JSON stores, pytest, existing Beaver channel runtime. + +--- + +## Scope + +Included: + +- JSON-backed bridge event dedupe with 60-second stale processing TTL. +- `ExternalConnectorChannel` adapter for outbound `/send` calls with stable `requestId`. +- Runtime and manager dynamic add/remove channel support. +- Beaver sidecar HTTP client. +- Weixin and Feishu connector registry entries backed by fake sidecar clients in tests. +- Backend APIs for connector sessions and bridge inbound events. + +Excluded: + +- Sidecar process implementation. +- Frontend connector wizard. +- Docker compose integration. +- Live vendor CLI verification. + +## File Structure + +- Create `app-instance/backend/beaver/interfaces/channels/connections/dedupe.py` + - `ConnectorMessageDedupeRecord` and `MessageDedupeStore`. +- Create `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py` + - Async HTTP client for generic sidecar contract. +- Create `app-instance/backend/beaver/interfaces/channels/connections/external.py` + - `ExternalConnectorBase`, `WeixinConnector`, and `FeishuConnector`. +- Create `app-instance/backend/beaver/interfaces/channels/external_connector.py` + - Runtime adapter that sends outbound messages to the sidecar. +- Modify `app-instance/backend/beaver/interfaces/channels/manager.py` + - Allow dynamic register/unregister while manager is started. +- Modify `app-instance/backend/beaver/interfaces/channels/runtime.py` + - Add lifecycle lock, `add_channel()`, `remove_channel()`, and adapter factory support for `external_connector/http`. +- Modify `app-instance/backend/beaver/interfaces/channels/connections/connectors.py` + - Materialize sidecar-backed connections and optionally activate runtime. +- Modify `app-instance/backend/beaver/interfaces/channels/connections/__init__.py` + - Export new dedupe, client, and connector symbols. +- Modify `app-instance/backend/beaver/interfaces/web/schemas/chat.py` + - Add connector session and bridge schemas. +- Modify `app-instance/backend/beaver/interfaces/web/schemas/__init__.py` + - Export new schemas. +- Modify `app-instance/backend/beaver/interfaces/web/app.py` + - Register Weixin/Feishu connectors, sidecar settings, connector session routes, and bridge endpoint. +- Test `app-instance/backend/tests/unit/test_connector_message_dedupe_store.py` +- Test `app-instance/backend/tests/unit/test_external_connector_channel.py` +- Test `app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py` +- Test `app-instance/backend/tests/unit/test_external_connector_bridge_api.py` +- Test `app-instance/backend/tests/unit/test_external_sidecar_connectors.py` + +--- + +### Task 1: Message Dedupe Store + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/connections/dedupe.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/connections/__init__.py` +- Test: `app-instance/backend/tests/unit/test_connector_message_dedupe_store.py` + +- [ ] **Step 1: Write failing dedupe tests** + +Create `app-instance/backend/tests/unit/test_connector_message_dedupe_store.py`: + +```python +from __future__ import annotations + +from beaver.interfaces.channels.connections import MessageDedupeStore + + +def test_message_dedupe_store_completes_and_dedupes_completed(tmp_path) -> None: + store = MessageDedupeStore(tmp_path / "message_dedupe.json") + + first = store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=1) + store.complete(first.dedupe_key, message_id="msg_1") + duplicate = store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=2) + + assert first.should_process is True + assert duplicate.should_process is False + assert duplicate.status == "completed" + assert duplicate.http_status == 200 + + +def test_message_dedupe_store_returns_conflict_for_active_processing(tmp_path) -> None: + store = MessageDedupeStore(tmp_path / "message_dedupe.json", processing_ttl_seconds=60) + + store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=1) + duplicate = store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=2) + + assert duplicate.should_process is False + assert duplicate.status == "processing" + assert duplicate.http_status == 409 + assert duplicate.retry_after_seconds == 5 + + +def test_message_dedupe_store_reprocesses_stale_processing(tmp_path) -> None: + store = MessageDedupeStore(tmp_path / "message_dedupe.json", processing_ttl_seconds=0) + + store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=1) + stale = store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=2) + + assert stale.should_process is True + assert stale.status == "processing" + assert stale.record.delivery_attempts == 2 + + +def test_message_dedupe_store_reprocesses_failed_records(tmp_path) -> None: + store = MessageDedupeStore(tmp_path / "message_dedupe.json") + + first = store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=1) + store.fail(first.dedupe_key, error="runtime rejected") + retry = store.begin(connection_id="conn_1", event_id="evt_1", delivery_attempt=2) + + assert retry.should_process is True + assert retry.record.delivery_attempts == 2 + assert retry.record.last_error is None +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_connector_message_dedupe_store.py -q +``` + +Expected: fail with `ImportError: cannot import name 'MessageDedupeStore'`. + +- [ ] **Step 3: Implement dedupe models and store** + +Create `app-instance/backend/beaver/interfaces/channels/connections/dedupe.py` with: + +```python +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from pathlib import Path +from threading import Lock +from typing import Any + + +def _iso_now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _parse_iso(value: str) -> datetime: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +@dataclass(slots=True) +class ConnectorMessageDedupeRecord: + dedupe_key: str + connection_id: str + event_id: str + status: str + first_seen_at: str + updated_at: str + delivery_attempts: int + message_id: str | None = None + last_error: str | None = None + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ConnectorMessageDedupeRecord": + return cls( + dedupe_key=str(data.get("dedupe_key") or ""), + connection_id=str(data.get("connection_id") or ""), + event_id=str(data.get("event_id") or ""), + status=str(data.get("status") or "processing"), + first_seen_at=str(data.get("first_seen_at") or _iso_now()), + updated_at=str(data.get("updated_at") or _iso_now()), + delivery_attempts=int(data.get("delivery_attempts") or 0), + message_id=str(data["message_id"]) if data.get("message_id") is not None else None, + last_error=str(data["last_error"]) if data.get("last_error") is not None else None, + ) + + +@dataclass(slots=True) +class DedupeBeginResult: + should_process: bool + dedupe_key: str + status: str + http_status: int + retry_after_seconds: int | None + record: ConnectorMessageDedupeRecord + + +class MessageDedupeStore: + def __init__(self, path: Path, *, processing_ttl_seconds: int = 60) -> None: + self.path = Path(path) + self.processing_ttl_seconds = int(processing_ttl_seconds) + self._lock = Lock() + + def begin(self, *, connection_id: str, event_id: str, delivery_attempt: int) -> DedupeBeginResult: + dedupe_key = f"{connection_id}:{event_id}" + now = _iso_now() + with self._lock: + data = self._load() + raw = data["records"].get(dedupe_key) + if isinstance(raw, dict): + record = ConnectorMessageDedupeRecord.from_dict(raw) + if record.status == "completed": + return DedupeBeginResult(False, dedupe_key, record.status, 200, None, record) + if record.status == "processing" and not self._is_stale(record, now): + return DedupeBeginResult(False, dedupe_key, record.status, 409, 5, record) + record.status = "processing" + record.updated_at = now + record.delivery_attempts = max(record.delivery_attempts + 1, int(delivery_attempt)) + record.last_error = None + else: + record = ConnectorMessageDedupeRecord( + dedupe_key=dedupe_key, + connection_id=connection_id, + event_id=event_id, + status="processing", + first_seen_at=now, + updated_at=now, + delivery_attempts=max(1, int(delivery_attempt)), + ) + data["records"][dedupe_key] = record.to_dict() + self._save(data) + return DedupeBeginResult(True, dedupe_key, record.status, 200, None, record) + + def complete(self, dedupe_key: str, *, message_id: str | None) -> ConnectorMessageDedupeRecord: + return self._mark(dedupe_key, status="completed", message_id=message_id, error=None) + + def fail(self, dedupe_key: str, *, error: str) -> ConnectorMessageDedupeRecord: + return self._mark(dedupe_key, status="failed", message_id=None, error=error) + + def _mark( + self, + dedupe_key: str, + *, + status: str, + message_id: str | None, + error: str | None, + ) -> ConnectorMessageDedupeRecord: + with self._lock: + data = self._load() + raw = data["records"].get(dedupe_key) + if not isinstance(raw, dict): + raise KeyError(dedupe_key) + record = ConnectorMessageDedupeRecord.from_dict(raw) + record.status = status + record.updated_at = _iso_now() + record.message_id = message_id or record.message_id + record.last_error = error + data["records"][dedupe_key] = record.to_dict() + self._save(data) + return record + + def _is_stale(self, record: ConnectorMessageDedupeRecord, now: str) -> bool: + age = (_parse_iso(now) - _parse_iso(record.updated_at)).total_seconds() + return age >= self.processing_ttl_seconds + + def _load(self) -> dict[str, Any]: + if not self.path.exists(): + return {"records": {}} + try: + data = json.loads(self.path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {"records": {}} + if not isinstance(data, dict) or not isinstance(data.get("records"), dict): + return {"records": {}} + return data + + def _save(self, data: dict[str, Any]) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = self.path.with_name(f"{self.path.name}.tmp") + tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + tmp_path.replace(self.path) +``` + +- [ ] **Step 4: Export dedupe symbols** + +Modify `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`: + +```python +from .dedupe import ConnectorMessageDedupeRecord, DedupeBeginResult, MessageDedupeStore +``` + +Add these names to `__all__`. + +- [ ] **Step 5: Run dedupe tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_connector_message_dedupe_store.py -q +``` + +Expected: `4 passed`. + +- [ ] **Step 6: Commit Task 1** + +```bash +git add app-instance/backend/beaver/interfaces/channels/connections/dedupe.py app-instance/backend/beaver/interfaces/channels/connections/__init__.py app-instance/backend/tests/unit/test_connector_message_dedupe_store.py +git commit -m "feat: add connector bridge dedupe store" +``` + +--- + +### Task 2: External Connector Channel + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py` +- Create: `app-instance/backend/beaver/interfaces/channels/external_connector.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py` +- Test: `app-instance/backend/tests/unit/test_external_connector_channel.py` + +- [ ] **Step 1: Write failing channel tests** + +Create `app-instance/backend/tests/unit/test_external_connector_channel.py`: + +```python +from __future__ import annotations + +import asyncio + +from beaver.foundation.events import ChannelIdentity, OutboundMessage +from beaver.interfaces.channels.external_connector import ExternalConnectorChannel + + +class FakeSidecarClient: + def __init__(self) -> None: + self.sent: list[dict] = [] + + async def send(self, payload: dict) -> dict: + self.sent.append(payload) + return {"ok": True, "providerMessageId": "provider-1"} + + +def test_external_connector_channel_sends_with_target_and_request_id() -> None: + async def run() -> None: + client = FakeSidecarClient() + channel = ExternalConnectorChannel( + channel_id="weixin-main", + platform_kind="weixin", + connection_id="conn_1", + account_id="weixin:me", + display_name="Weixin Main", + sidecar_client=client, + ) + message = OutboundMessage( + channel="weixin-main", + content="reply", + session_id="s1", + finish_reason="stop", + message_id="out-msg-1", + channel_identity=ChannelIdentity( + channel_id="weixin-main", + kind="weixin", + account_id="weixin:me", + peer_id="peer-1", + peer_type="dm", + thread_id=None, + user_id="sender-1", + message_id="in-msg-1", + ), + ) + + await channel.send(message) + + assert client.sent == [ + { + "requestId": "out_out-msg-1", + "connectionId": "conn_1", + "channelId": "weixin-main", + "kind": "weixin", + "target": {"peerId": "peer-1", "peerType": "dm", "threadId": None}, + "content": "reply", + "metadata": {"inboundMessageId": "in-msg-1", "sessionId": "s1"}, + } + ] + + asyncio.run(run()) + + +def test_external_connector_channel_requires_identity() -> None: + async def run() -> None: + channel = ExternalConnectorChannel( + channel_id="weixin-main", + platform_kind="weixin", + connection_id="conn_1", + account_id="weixin:me", + display_name="Weixin Main", + sidecar_client=FakeSidecarClient(), + ) + message = OutboundMessage(channel="weixin-main", content="reply", session_id="s1", finish_reason="stop") + + try: + await channel.send(message) + except ValueError as exc: + assert "channel_identity is required" in str(exc) + else: + raise AssertionError("Expected ValueError") + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_connector_channel.py -q +``` + +Expected: fail with `ModuleNotFoundError: No module named 'beaver.interfaces.channels.external_connector'`. + +- [ ] **Step 3: Implement sidecar client** + +Create `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py`: + +```python +from __future__ import annotations + +from typing import Any + +import httpx + + +class ConnectorSidecarClient: + def __init__(self, *, base_url: str, token: str, timeout_seconds: float = 20.0) -> None: + self.base_url = base_url.rstrip("/") + self.token = token + self.timeout_seconds = float(timeout_seconds) + + async def get_connectors(self) -> list[dict[str, Any]]: + return await self._request("GET", "/connectors") + + async def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: + return await self._request("POST", "/connector-sessions", json=payload) + + async def get_session(self, session_id: str) -> dict[str, Any]: + return await self._request("GET", f"/connector-sessions/{session_id}") + + async def cancel_session(self, session_id: str) -> dict[str, Any]: + return await self._request("POST", f"/connector-sessions/{session_id}/cancel", json={}) + + async def logout(self, connection_id: str) -> dict[str, Any]: + return await self._request("POST", f"/connections/{connection_id}/logout", json={}) + + async def send(self, payload: dict[str, Any]) -> dict[str, Any]: + return await self._request("POST", "/send", json=payload) + + async def _request(self, method: str, path: str, *, json: dict[str, Any] | None = None) -> Any: + headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} + async with httpx.AsyncClient(timeout=self.timeout_seconds) as client: + response = await client.request(method, f"{self.base_url}{path}", json=json, headers=headers) + response.raise_for_status() + return response.json() +``` + +- [ ] **Step 4: Implement external channel** + +Create `app-instance/backend/beaver/interfaces/channels/external_connector.py`: + +```python +from __future__ import annotations + +from typing import Any + +from beaver.foundation.events import OutboundMessage +from beaver.interfaces.channels.connections.sidecar_client import ConnectorSidecarClient + + +class ExternalConnectorChannel: + def __init__( + self, + *, + channel_id: str, + platform_kind: str, + connection_id: str, + account_id: str, + display_name: str, + sidecar_client: ConnectorSidecarClient | Any, + ) -> None: + self.channel_id = channel_id + self.kind = "external_connector" + self.mode = "http" + self.platform_kind = platform_kind + self.connection_id = connection_id + self.account_id = account_id + self.display_name = display_name or channel_id + self.sidecar_client = sidecar_client + self.started = False + + async def start(self) -> None: + self.started = True + + async def stop(self) -> None: + self.started = False + + async def send(self, message: OutboundMessage) -> None: + identity = message.channel_identity + if identity is None: + raise ValueError("channel_identity is required for external connector sends") + payload = { + "requestId": _request_id(message), + "connectionId": self.connection_id, + "channelId": self.channel_id, + "kind": self.platform_kind, + "target": { + "peerId": identity.peer_id, + "peerType": identity.peer_type, + "threadId": identity.thread_id, + }, + "content": message.content, + "metadata": { + "inboundMessageId": identity.message_id, + "sessionId": message.session_id, + }, + } + await self.sidecar_client.send(payload) + + +def _request_id(message: OutboundMessage) -> str: + return f"out_{message.message_id}" +``` + +- [ ] **Step 5: Export channel symbol** + +Modify `app-instance/backend/beaver/interfaces/channels/__init__.py`: + +```python +from .external_connector import ExternalConnectorChannel +``` + +Add `ExternalConnectorChannel` to `__all__`. + +- [ ] **Step 6: Run channel tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_connector_channel.py -q +``` + +Expected: `2 passed`. + +- [ ] **Step 7: Commit Task 2** + +```bash +git add app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py app-instance/backend/beaver/interfaces/channels/external_connector.py app-instance/backend/beaver/interfaces/channels/__init__.py app-instance/backend/tests/unit/test_external_connector_channel.py +git commit -m "feat: add external connector channel" +``` + +--- + +### Task 3: Dynamic Runtime Channels + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/channels/manager.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py` +- Test: `app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py` + +- [ ] **Step 1: Write failing dynamic runtime tests** + +Create `app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py`: + +```python +from __future__ import annotations + +import asyncio + +from beaver.foundation.config.schema import ChannelConfig +from beaver.foundation.events import MessageBus, OutboundMessage +from beaver.interfaces.channels.runtime import ChannelRuntime + + +class FakeService: + async def handle_inbound_message(self, inbound): + return OutboundMessage(channel=inbound.channel, content="ok", session_id=inbound.session_id, finish_reason="stop") + + +def test_runtime_add_channel_starts_new_channel_after_runtime_start(tmp_path) -> None: + async def run() -> None: + runtime = ChannelRuntime(service=FakeService(), workspace=tmp_path, channels={}, bus=MessageBus()) + await runtime.start() + try: + await runtime.add_channel( + "webhook-dev", + ChannelConfig(enabled=True, kind="webhook", mode="webhook", account_id="acct"), + ) + assert "webhook-dev" in runtime.adapters + assert runtime.states["webhook-dev"]["state"] == "running" + finally: + await runtime.stop() + + asyncio.run(run()) + + +def test_runtime_add_channel_noops_for_same_config(tmp_path) -> None: + async def run() -> None: + cfg = ChannelConfig(enabled=True, kind="webhook", mode="webhook", account_id="acct") + runtime = ChannelRuntime(service=FakeService(), workspace=tmp_path, channels={}, bus=MessageBus()) + await runtime.start() + try: + await runtime.add_channel("webhook-dev", cfg) + first = runtime.adapters["webhook-dev"] + await runtime.add_channel("webhook-dev", cfg) + assert runtime.adapters["webhook-dev"] is first + finally: + await runtime.stop() + + asyncio.run(run()) + + +def test_runtime_replacement_failure_keeps_old_channel(tmp_path) -> None: + async def run() -> None: + good = ChannelConfig(enabled=True, kind="webhook", mode="webhook", account_id="acct") + bad = ChannelConfig(enabled=True, kind="missing", mode="http", account_id="acct") + runtime = ChannelRuntime(service=FakeService(), workspace=tmp_path, channels={}, bus=MessageBus()) + await runtime.start() + try: + await runtime.add_channel("webhook-dev", good) + old = runtime.adapters["webhook-dev"] + try: + await runtime.add_channel("webhook-dev", bad) + except ValueError: + pass + else: + raise AssertionError("Expected ValueError") + assert runtime.adapters["webhook-dev"] is old + assert runtime.channel_configs["webhook-dev"] == good + assert runtime.states["webhook-dev"]["state"] == "running" + finally: + await runtime.stop() + + asyncio.run(run()) + + +def test_runtime_remove_channel_stops_and_unregisters(tmp_path) -> None: + async def run() -> None: + runtime = ChannelRuntime(service=FakeService(), workspace=tmp_path, channels={}, bus=MessageBus()) + await runtime.start() + try: + await runtime.add_channel( + "webhook-dev", + ChannelConfig(enabled=True, kind="webhook", mode="webhook", account_id="acct"), + ) + await runtime.remove_channel("webhook-dev") + assert "webhook-dev" not in runtime.adapters + assert "webhook-dev" not in runtime.manager.channels + assert runtime.states["webhook-dev"]["state"] == "removed" + finally: + await runtime.stop() + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime_dynamic_channels.py -q +``` + +Expected: fail with `AttributeError: 'ChannelRuntime' object has no attribute 'add_channel'`. + +- [ ] **Step 3: Add dynamic manager methods** + +Modify `app-instance/backend/beaver/interfaces/channels/manager.py`: + +```python + def register(self, channel: ChannelAdapter) -> None: + if channel.channel_id in self.channels: + raise ValueError(f"Channel already registered: {channel.channel_id}") + self.channels[channel.channel_id] = channel + + def unregister(self, channel_id: str) -> ChannelAdapter | None: + return self.channels.pop(channel_id, None) + + def replace_registered(self, channel: ChannelAdapter) -> ChannelAdapter | None: + old = self.channels.get(channel.channel_id) + self.channels[channel.channel_id] = channel + return old +``` + +Keep `start()`, `stop()`, and `dispatch_outbound()` unchanged. + +- [ ] **Step 4: Add runtime lifecycle lock and methods** + +Modify `app-instance/backend/beaver/interfaces/channels/runtime.py`: + +```python + self._lifecycle_lock = asyncio.Lock() +``` + +Add methods to `ChannelRuntime`: + +```python + async def add_channel(self, channel_id: str, config: ChannelConfig) -> None: + async with self._lifecycle_lock: + current = self.channel_configs.get(channel_id) + if current == config and channel_id in self.adapters: + return + if not config.enabled: + await self.remove_channel(channel_id) + self.channel_configs[channel_id] = config + self.states[channel_id] = {"state": "disabled", "last_error": None} + return + adapter = self._build_adapter(channel_id, config) + await adapter.start() + old = self.manager.replace_registered(adapter) + old_adapter = self.adapters.get(channel_id) + self.adapters[channel_id] = adapter + self.channel_configs[channel_id] = config + self.states[channel_id] = {"state": "running", "last_error": None, "started_at": _iso_now()} + self.events.record(channel_id=channel_id, kind="adapter_started") + if old_adapter is not None and old_adapter is not adapter: + await old_adapter.stop() + + async def remove_channel(self, channel_id: str) -> None: + async with self._lifecycle_lock: + adapter = self.adapters.pop(channel_id, None) + self.manager.unregister(channel_id) + self.channel_configs.pop(channel_id, None) + if adapter is not None: + await adapter.stop() + self.events.record(channel_id=channel_id, kind="adapter_stopped") + self.states[channel_id] = {"state": "removed", "last_error": None} +``` + +If this direct implementation deadlocks because `add_channel()` calls `remove_channel()` under the same lock, split the locked removal body into a private `_remove_channel_locked()` helper and call that from both public methods. + +- [ ] **Step 5: Run dynamic runtime tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime_dynamic_channels.py tests/unit/test_channel_runtime.py tests/unit/test_gateway_channels.py -q +``` + +Expected: all listed tests pass. + +- [ ] **Step 6: Commit Task 3** + +```bash +git add app-instance/backend/beaver/interfaces/channels/manager.py app-instance/backend/beaver/interfaces/channels/runtime.py app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py +git commit -m "feat: support dynamic runtime channels" +``` + +--- + +### Task 4: Runtime Factory For External Connector Channel + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py` +- Test: `app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py` + +- [ ] **Step 1: Extend dynamic runtime tests** + +Append to `app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py`: + +```python +def test_runtime_builds_external_connector_channel(tmp_path, monkeypatch) -> None: + async def run() -> None: + monkeypatch.setenv("EXTERNAL_CONNECTOR_TOKEN", "connector-token") + runtime = ChannelRuntime(service=FakeService(), workspace=tmp_path, channels={}, bus=MessageBus()) + await runtime.start() + try: + await runtime.add_channel( + "weixin-main", + ChannelConfig( + enabled=True, + kind="external_connector", + mode="http", + account_id="weixin:me", + display_name="Weixin Main", + config={ + "platformKind": "weixin", + "connectionId": "conn_1", + "sidecarBaseUrl": "http://external-connector:8787", + }, + ), + ) + adapter = runtime.adapters["weixin-main"] + assert adapter.kind == "external_connector" + assert adapter.mode == "http" + assert getattr(adapter, "platform_kind") == "weixin" + finally: + await runtime.stop() + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run test to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime_dynamic_channels.py::test_runtime_builds_external_connector_channel -q +``` + +Expected: fail with `ValueError: Unsupported channel kind/mode: external_connector/http`. + +- [ ] **Step 3: Add runtime factory branch** + +Modify `_build_adapter()` in `app-instance/backend/beaver/interfaces/channels/runtime.py` before the final `raise`: + +```python + if cfg.kind == "external_connector" and cfg.mode == "http": + import os + + from beaver.interfaces.channels.connections.sidecar_client import ConnectorSidecarClient + from beaver.interfaces.channels.external_connector import ExternalConnectorChannel + + base_url = str(cfg.config.get("sidecarBaseUrl") or os.getenv("EXTERNAL_CONNECTOR_BASE_URL") or "").strip() + token = os.getenv("EXTERNAL_CONNECTOR_TOKEN", "") + platform_kind = str(cfg.config.get("platformKind") or "").strip() + connection_id = str(cfg.config.get("connectionId") or "").strip() + if not base_url: + raise ValueError("external connector sidecarBaseUrl is required") + if not platform_kind: + raise ValueError("external connector platformKind is required") + if not connection_id: + raise ValueError("external connector connectionId is required") + return ExternalConnectorChannel( + channel_id=channel_id, + platform_kind=platform_kind, + connection_id=connection_id, + account_id=cfg.account_id, + display_name=cfg.display_name, + sidecar_client=ConnectorSidecarClient(base_url=base_url, token=token), + ) +``` + +- [ ] **Step 4: Run tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime_dynamic_channels.py tests/unit/test_external_connector_channel.py -q +``` + +Expected: all listed tests pass. + +- [ ] **Step 5: Commit Task 4** + +```bash +git add app-instance/backend/beaver/interfaces/channels/runtime.py app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py +git commit -m "feat: materialize external connector channels" +``` + +--- + +### Task 5: Bridge API + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/web/schemas/chat.py` +- Modify: `app-instance/backend/beaver/interfaces/web/schemas/__init__.py` +- Modify: `app-instance/backend/beaver/interfaces/web/app.py` +- Test: `app-instance/backend/tests/unit/test_external_connector_bridge_api.py` + +- [ ] **Step 1: Write failing bridge API tests** + +Create `app-instance/backend/tests/unit/test_external_connector_bridge_api.py`: + +```python +from __future__ import annotations + +from fastapi.testclient import TestClient + +from beaver.interfaces.channels.connections import ChannelConnectionStore, CredentialStore +from beaver.interfaces.web.app import create_app +from beaver.services.agent_service import AgentService + + +def _app(tmp_path, monkeypatch): + monkeypatch.setenv("BEAVER_BRIDGE_TOKEN", "bridge-token") + config_path = tmp_path / "config.json" + config_path.write_text( + '{"agents": {"defaults": {"workspace": "%s"}}, "providers": {}}' % str(tmp_path), + encoding="utf-8", + ) + service = AgentService(config_path=config_path) + app = create_app(service=service, manage_service_lifecycle=False) + return app, service + + +def test_bridge_endpoint_accepts_valid_event(tmp_path, monkeypatch) -> None: + app, service = _app(tmp_path, monkeypatch) + state_dir = tmp_path / "state" / "channel_connections" + store = ChannelConnectionStore(state_dir / "connections.json") + connection = store.create( + kind="weixin", + mode="sidecar", + display_name="Weixin Main", + account_id="weixin:me", + owner_user_id=None, + auth_type="connector_session", + ) + store.update_status(connection.connection_id, status="connected", last_error=None) + try: + with TestClient(app) as client: + response = client.post( + "/api/channel-connector-bridge/events", + headers={"Authorization": "Bearer bridge-token"}, + json={ + "eventId": "evt-1", + "timestamp": "2026-06-02T09:30:00Z", + "deliveryAttempt": 1, + "connectionId": connection.connection_id, + "channelId": connection.channel_id, + "kind": "weixin", + "accountId": "weixin:me", + "peerId": "peer-1", + "peerType": "dm", + "userId": "sender-1", + "threadId": None, + "messageId": "msg-1", + "messageType": "text", + "content": "hello", + "metadata": {}, + }, + ) + assert response.status_code == 200 + assert response.json()["accepted"] is True + finally: + service.close() + + +def test_bridge_endpoint_rejects_invalid_token(tmp_path, monkeypatch) -> None: + app, service = _app(tmp_path, monkeypatch) + try: + with TestClient(app) as client: + response = client.post("/api/channel-connector-bridge/events", headers={"Authorization": "Bearer wrong"}, json={}) + assert response.status_code == 401 + finally: + service.close() + + +def test_bridge_endpoint_returns_conflict_for_processing_duplicate(tmp_path, monkeypatch) -> None: + app, service = _app(tmp_path, monkeypatch) + state_dir = tmp_path / "state" / "channel_connections" + store = ChannelConnectionStore(state_dir / "connections.json") + connection = store.create( + kind="weixin", + mode="sidecar", + display_name="Weixin Main", + account_id="weixin:me", + owner_user_id=None, + auth_type="connector_session", + ) + store.update_status(connection.connection_id, status="connected", last_error=None) + payload = { + "eventId": "evt-1", + "timestamp": "2026-06-02T09:30:00Z", + "deliveryAttempt": 1, + "connectionId": connection.connection_id, + "channelId": connection.channel_id, + "kind": "weixin", + "accountId": "weixin:me", + "peerId": "peer-1", + "peerType": "dm", + "userId": "sender-1", + "threadId": None, + "messageId": "msg-1", + "messageType": "text", + "content": "hello", + "metadata": {}, + } + try: + with TestClient(app) as client: + first = client.post("/api/channel-connector-bridge/events", headers={"Authorization": "Bearer bridge-token"}, json=payload) + second = client.post("/api/channel-connector-bridge/events", headers={"Authorization": "Bearer bridge-token"}, json={**payload, "deliveryAttempt": 2}) + assert first.status_code == 200 + assert second.status_code in {200, 409} + finally: + service.close() +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_connector_bridge_api.py -q +``` + +Expected: fail with `404 Not Found` for `/api/channel-connector-bridge/events`. + +- [ ] **Step 3: Add bridge schemas** + +Append to `app-instance/backend/beaver/interfaces/web/schemas/chat.py`: + +```python +class WebConnectorBridgeEventRequest(BaseModel): + event_id: str = Field(alias="eventId") + timestamp: str + delivery_attempt: int = Field(default=1, alias="deliveryAttempt") + connection_id: str = Field(alias="connectionId") + channel_id: str = Field(alias="channelId") + kind: str + account_id: str = Field(alias="accountId") + peer_id: str = Field(alias="peerId") + peer_type: str = Field(default="unknown", alias="peerType") + user_id: str | None = Field(default=None, alias="userId") + thread_id: str | None = Field(default=None, alias="threadId") + message_id: str = Field(alias="messageId") + message_type: str = Field(default="text", alias="messageType") + content: str + metadata: dict[str, Any] = Field(default_factory=dict) + + +class WebConnectorBridgeEventResponse(BaseModel): + accepted: bool + duplicate: bool = False + pending: bool = False + retry_after_seconds: int | None = Field(default=None, alias="retryAfterSeconds") +``` + +Export both in `app-instance/backend/beaver/interfaces/web/schemas/__init__.py`. + +- [ ] **Step 4: Add bridge endpoint** + +Modify `app-instance/backend/beaver/interfaces/web/app.py` imports to include: + +```python +from beaver.foundation.events import ChannelIdentity, InboundMessage +from beaver.interfaces.channels.connections import MessageDedupeStore +``` + +Add helpers: + +```python +def _bridge_token() -> str: + return os.getenv("BEAVER_BRIDGE_TOKEN", "") + + +def _message_dedupe_store(workspace: Path) -> MessageDedupeStore: + return MessageDedupeStore(_connection_state_dir(workspace) / "message_dedupe.json") +``` + +Add the route inside `create_app()`: + +```python + @app.post("/api/channel-connector-bridge/events", response_model=WebConnectorBridgeEventResponse) + async def accept_connector_bridge_event( + request: Request, + payload: WebConnectorBridgeEventRequest, + authorization: str | None = Header(default=None), + ) -> JSONResponse | WebConnectorBridgeEventResponse: + expected = _bridge_token() + if not expected or authorization != f"Bearer {expected}": + raise HTTPException(status_code=401, detail="Invalid connector bridge token") + registry = get_channel_connector_registry(request) + try: + connection = registry.connection_store.get(payload.connection_id) + except KeyError: + raise HTTPException(status_code=404, detail="Channel connection not found") + if connection.status == "revoked": + raise HTTPException(status_code=404, detail="Channel connection not found") + store = _message_dedupe_store(_channel_connection_workspace(request)) + begin = store.begin( + connection_id=payload.connection_id, + event_id=payload.event_id, + delivery_attempt=payload.delivery_attempt, + ) + if not begin.should_process: + body = WebConnectorBridgeEventResponse( + accepted=begin.http_status == 200, + duplicate=True, + pending=begin.http_status == 409, + retryAfterSeconds=begin.retry_after_seconds, + ).model_dump(by_alias=True) + return JSONResponse(status_code=begin.http_status, content=body) + runtime = get_channel_runtime(request) + identity = ChannelIdentity( + channel_id=payload.channel_id, + kind=payload.kind, + account_id=payload.account_id, + peer_id=payload.peer_id, + thread_id=payload.thread_id, + peer_type=payload.peer_type, + user_id=payload.user_id, + message_id=payload.message_id, + ) + inbound = InboundMessage( + channel=payload.channel_id, + content=payload.content, + content_type=payload.message_type, + channel_identity=identity, + user_id=payload.user_id, + message_id=payload.message_id, + metadata=dict(payload.metadata), + ) + result = await runtime.accept_inbound(inbound) + if result.accepted: + store.complete(begin.dedupe_key, message_id=payload.message_id) + else: + store.fail(begin.dedupe_key, error=result.error or "runtime rejected bridge event") + return WebConnectorBridgeEventResponse(accepted=result.accepted, duplicate=result.duplicate, pending=result.pending) +``` + +If `_channel_connection_workspace(request)` does not exist yet, add it: + +```python +def _channel_connection_workspace(request: Request) -> Path: + workspace = getattr(request.app.state, "channel_connection_workspace", None) + if workspace is not None: + return Path(workspace) + return Path(get_agent_service(request).loader.workspace) +``` + +- [ ] **Step 5: Run bridge tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_connector_bridge_api.py -q +``` + +Expected: all listed tests pass. If the duplicate test returns `200` because runtime completes before the second request, add a focused `MessageDedupeStore` API test for `409`; do not weaken the store behavior. + +- [ ] **Step 6: Commit Task 5** + +```bash +git add app-instance/backend/beaver/interfaces/web/app.py app-instance/backend/beaver/interfaces/web/schemas/chat.py app-instance/backend/beaver/interfaces/web/schemas/__init__.py app-instance/backend/tests/unit/test_external_connector_bridge_api.py +git commit -m "feat: add external connector bridge api" +``` + +--- + +### Task 6: Weixin And Feishu Connectors + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/connections/external.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/connections/__init__.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/connections/connectors.py` +- Modify: `app-instance/backend/beaver/interfaces/web/schemas/chat.py` +- Modify: `app-instance/backend/beaver/interfaces/web/schemas/__init__.py` +- Modify: `app-instance/backend/beaver/interfaces/web/app.py` +- Test: `app-instance/backend/tests/unit/test_external_sidecar_connectors.py` + +- [ ] **Step 1: Write failing connector tests** + +Create `app-instance/backend/tests/unit/test_external_sidecar_connectors.py`: + +```python +from __future__ import annotations + +import asyncio + +from beaver.interfaces.channels.connections import ( + ChannelConnectionStore, + CredentialStore, + FeishuConnector, + WeixinConnector, +) + + +class FakeSidecarClient: + def __init__(self) -> None: + self.sessions: dict[str, dict] = {} + self.started: list[dict] = [] + self.logged_out: list[str] = [] + + async def start_session(self, payload: dict) -> dict: + self.started.append(payload) + session = { + "sessionId": "cs_1", + "kind": payload["kind"], + "status": "qr_ready", + "qrImage": "data:image/png;base64,abc", + "accountId": None, + "displayName": None, + "metadata": {}, + } + self.sessions["cs_1"] = session + return session + + async def get_session(self, session_id: str) -> dict: + return self.sessions[session_id] + + async def logout(self, connection_id: str) -> dict: + self.logged_out.append(connection_id) + return {"ok": True} + + +def test_weixin_connector_starts_connector_session(tmp_path) -> None: + async def run() -> None: + connection_store = ChannelConnectionStore(tmp_path / "connections.json") + credential_store = CredentialStore(tmp_path / "credentials.json") + client = FakeSidecarClient() + connector = WeixinConnector( + connection_store=connection_store, + credential_store=credential_store, + sidecar_client=client, + sidecar_base_url="http://external-connector:8787", + ) + + view = await connector.start_session(display_name="Weixin Main", owner_user_id="user-1", options={}) + + assert view["sessionId"] == "cs_1" + assert client.started[0]["kind"] == "weixin" + assert client.started[0]["connectionId"].startswith("conn_") + assert connection_store.list()[0].kind == "weixin" + assert connection_store.list()[0].status == "pairing" + + asyncio.run(run()) + + +def test_weixin_connector_poll_connected_materializes_external_runtime(tmp_path) -> None: + async def run() -> None: + connection_store = ChannelConnectionStore(tmp_path / "connections.json") + credential_store = CredentialStore(tmp_path / "credentials.json") + client = FakeSidecarClient() + connector = WeixinConnector( + connection_store=connection_store, + credential_store=credential_store, + sidecar_client=client, + sidecar_base_url="http://external-connector:8787", + ) + await connector.start_session(display_name="Weixin Main", owner_user_id=None, options={}) + connection = connection_store.list()[0] + client.sessions["cs_1"] = { + "sessionId": "cs_1", + "kind": "weixin", + "status": "connected", + "accountId": "weixin:me", + "displayName": "Me", + "metadata": {"stateRef": "state-1"}, + } + + result = await connector.poll_session("cs_1") + updated = connection_store.get(connection.connection_id) + spec = await connector.materialize_runtime(connection.connection_id) + + assert result["status"] == "connected" + assert updated.status == "connected" + assert updated.account_id == "weixin:me" + assert spec.kind == "external_connector" + assert spec.mode == "http" + assert spec.config["platformKind"] == "weixin" + assert spec.config["sidecarBaseUrl"] == "http://external-connector:8787" + + asyncio.run(run()) + + +def test_feishu_connector_uses_feishu_kind(tmp_path) -> None: + async def run() -> None: + connection_store = ChannelConnectionStore(tmp_path / "connections.json") + credential_store = CredentialStore(tmp_path / "credentials.json") + client = FakeSidecarClient() + connector = FeishuConnector( + connection_store=connection_store, + credential_store=credential_store, + sidecar_client=client, + sidecar_base_url="http://external-connector:8787", + ) + + await connector.start_session(display_name="Feishu Main", owner_user_id=None, options={"domain": "feishu"}) + + assert client.started[0]["kind"] == "feishu" + assert client.started[0]["options"] == {"domain": "feishu"} + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_sidecar_connectors.py -q +``` + +Expected: fail with `ImportError: cannot import name 'WeixinConnector'`. + +- [ ] **Step 3: Implement connector classes** + +Create `app-instance/backend/beaver/interfaces/channels/connections/external.py`: + +```python +from __future__ import annotations + +from typing import Any + +from .models import ChannelRuntimeSpec, ValidationResult +from .sidecar_client import ConnectorSidecarClient +from .store import ChannelConnectionStore, CredentialStore + + +class ExternalConnectorBase: + kind = "" + capabilities: list[str] = [] + + def __init__( + self, + *, + connection_store: ChannelConnectionStore, + credential_store: CredentialStore, + sidecar_client: ConnectorSidecarClient | Any, + sidecar_base_url: str, + ) -> None: + self.connection_store = connection_store + self.credential_store = credential_store + self.sidecar_client = sidecar_client + self.sidecar_base_url = sidecar_base_url + + async def start_session( + self, + *, + display_name: str, + owner_user_id: str | None, + options: dict[str, Any], + ) -> dict[str, Any]: + connection = self.connection_store.create( + kind=self.kind, + mode="sidecar", + display_name=display_name or self.kind, + account_id="", + owner_user_id=owner_user_id, + auth_type="connector_session", + runtime_config={"sidecarBaseUrl": self.sidecar_base_url}, + capabilities=list(self.capabilities), + ) + self.connection_store.update_status(connection.connection_id, status="pairing", last_error=None) + payload = { + "kind": self.kind, + "connectionId": connection.connection_id, + "channelId": connection.channel_id, + "displayName": connection.display_name, + "callbackBaseUrl": "", + "options": dict(options), + } + view = await self.sidecar_client.start_session(payload) + connection.pairing_session_id = str(view.get("sessionId") or "") + self.connection_store.update(connection) + return view + + async def poll_session(self, session_id: str) -> dict[str, Any]: + view = await self.sidecar_client.get_session(session_id) + connection = self._connection_for_session(session_id) + status = str(view.get("status") or "") + if status == "connected": + connection.account_id = str(view.get("accountId") or connection.account_id) + connection.display_name = str(view.get("displayName") or connection.display_name) + metadata = view.get("metadata") if isinstance(view.get("metadata"), dict) else {} + state_ref = metadata.get("stateRef") + if state_ref: + connection.credentials_ref = self.credential_store.put(kind=self.kind, values={"stateRef": state_ref}) + self.connection_store.update(connection) + self.connection_store.update_status(connection.connection_id, status="connected", last_error=None) + elif status in {"expired", "error", "cancelled"}: + self.connection_store.update_status(connection.connection_id, status="error", last_error=str(view.get("error") or status)) + return view + + async def validate(self, connection_id: str) -> ValidationResult: + connection = self.connection_store.get(connection_id) + if connection.status in {"connected", "running"}: + return ValidationResult(ok=True, status="connected", account_id=connection.account_id, display_name=connection.display_name) + return ValidationResult(ok=False, status=connection.status, error=connection.last_error) + + async def materialize_runtime(self, connection_id: str) -> ChannelRuntimeSpec: + connection = self.connection_store.get(connection_id) + return ChannelRuntimeSpec( + channel_id=connection.channel_id, + kind="external_connector", + mode="http", + account_id=connection.account_id, + display_name=connection.display_name, + config={ + "platformKind": self.kind, + "connectionId": connection.connection_id, + "sidecarBaseUrl": connection.runtime_config.get("sidecarBaseUrl") or self.sidecar_base_url, + }, + secrets_ref=None, + ) + + async def revoke(self, connection_id: str) -> None: + await self.sidecar_client.logout(connection_id) + + def _connection_for_session(self, session_id: str): + for connection in self.connection_store.list(): + if connection.pairing_session_id == session_id: + return connection + raise KeyError(session_id) + + +class WeixinConnector(ExternalConnectorBase): + kind = "weixin" + capabilities = ["receive_text", "send_text", "receive_media", "direct_messages"] + + +class FeishuConnector(ExternalConnectorBase): + kind = "feishu" + capabilities = ["receive_text", "send_text", "receive_media", "groups"] +``` + +- [ ] **Step 4: Export connectors and update registry materialization** + +Modify `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`: + +```python +from .external import ExternalConnectorBase, FeishuConnector, WeixinConnector +``` + +Add names to `__all__`. + +Ensure `ChannelConnectorRegistry.materialize_channel_configs()` accepts `ChannelRuntimeSpec(kind="external_connector", mode="http")` and emits `ChannelConfig(secrets={})`. + +- [ ] **Step 5: Add connector session web schemas** + +Append to `app-instance/backend/beaver/interfaces/web/schemas/chat.py`: + +```python +class WebConnectorSessionCreateRequest(BaseModel): + kind: str + display_name: str | None = Field(default=None, alias="displayName") + owner_user_id: str | None = Field(default=None, alias="ownerUserId") + options: dict[str, Any] = Field(default_factory=dict) + + +class WebConnectorSessionResponse(BaseModel): + session: dict[str, Any] + connection: dict[str, Any] | None = None +``` + +Export both in `app-instance/backend/beaver/interfaces/web/schemas/__init__.py`. + +- [ ] **Step 6: Register Weixin/Feishu in app registry** + +Modify `_build_channel_connector_registry()` in `app-instance/backend/beaver/interfaces/web/app.py` to create a sidecar client: + +```python + sidecar_base_url = os.getenv("EXTERNAL_CONNECTOR_BASE_URL", "http://external-connector:8787") + sidecar_token = os.getenv("EXTERNAL_CONNECTOR_TOKEN", "") + sidecar_client = ConnectorSidecarClient(base_url=sidecar_base_url, token=sidecar_token) + registry.register(WeixinConnector( + connection_store=connection_store, + credential_store=credential_store, + sidecar_client=sidecar_client, + sidecar_base_url=sidecar_base_url, + )) + registry.register(FeishuConnector( + connection_store=connection_store, + credential_store=credential_store, + sidecar_client=sidecar_client, + sidecar_base_url=sidecar_base_url, + )) +``` + +Keep Telegram registration unchanged. + +- [ ] **Step 7: Add connector session API routes** + +Add routes near existing channel connection APIs in `app-instance/backend/beaver/interfaces/web/app.py`: + +```python + @app.post("/api/channel-connector-sessions", response_model=WebConnectorSessionResponse) + async def start_channel_connector_session( + request: Request, + payload: WebConnectorSessionCreateRequest, + ) -> WebConnectorSessionResponse: + registry = get_channel_connector_registry(request) + connector = registry.connector_for_kind(_clean_text(payload.kind)) + if not hasattr(connector, "start_session"): + raise HTTPException(status_code=400, detail="Connector does not support sessions") + view = await connector.start_session( + display_name=_clean_text(payload.display_name) or _clean_text(payload.kind), + owner_user_id=_clean_text(payload.owner_user_id) or None, + options=payload.options, + ) + connection = registry.connection_store.get(str(view.get("connectionId") or registry.connection_store.list()[-1].connection_id)) + return WebConnectorSessionResponse(session=view, connection=_connection_response_view(connection)) + + @app.get("/api/channel-connector-sessions/{session_id}", response_model=WebConnectorSessionResponse) + async def get_channel_connector_session(session_id: str, request: Request) -> WebConnectorSessionResponse: + registry = get_channel_connector_registry(request) + connection = next((item for item in registry.connection_store.list() if item.pairing_session_id == session_id), None) + if connection is None: + raise HTTPException(status_code=404, detail="Connector session not found") + connector = registry.connector_for_kind(connection.kind) + if not hasattr(connector, "poll_session"): + raise HTTPException(status_code=400, detail="Connector does not support sessions") + view = await connector.poll_session(session_id) + connection = registry.connection_store.get(connection.connection_id) + if connection.status == "connected": + runtime = get_channel_runtime(request) + config = (await registry.materialize_channel_configs())[connection.channel_id] + await runtime.add_channel(connection.channel_id, config) + return WebConnectorSessionResponse(session=view, connection=_connection_response_view(connection)) +``` + +Add `connector_for_kind()` to `ChannelConnectorRegistry`: + +```python + def connector_for_kind(self, kind: str) -> ChannelConnector: + return self._connector(kind) +``` + +- [ ] **Step 8: Run connector tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_sidecar_connectors.py tests/unit/test_channel_connection_api.py tests/unit/test_channel_connector_registry.py -q +``` + +Expected: all listed tests pass. + +- [ ] **Step 9: Commit Task 6** + +```bash +git add app-instance/backend/beaver/interfaces/channels/connections app-instance/backend/beaver/interfaces/web/app.py app-instance/backend/beaver/interfaces/web/schemas/chat.py app-instance/backend/beaver/interfaces/web/schemas/__init__.py app-instance/backend/tests/unit/test_external_sidecar_connectors.py +git commit -m "feat: add sidecar-backed channel connectors" +``` + +--- + +### Task 7: Final Backend Verification + +**Files:** +- Review: `docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md` + +- [ ] **Step 1: Run focused backend tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest \ + tests/unit/test_connector_message_dedupe_store.py \ + tests/unit/test_external_connector_channel.py \ + tests/unit/test_channel_runtime_dynamic_channels.py \ + tests/unit/test_external_connector_bridge_api.py \ + tests/unit/test_external_sidecar_connectors.py \ + tests/unit/test_channel_connection_store.py \ + tests/unit/test_channel_connector_registry.py \ + tests/unit/test_channel_connection_api.py \ + tests/unit/test_channel_runtime.py \ + tests/unit/test_gateway_channels.py \ + -q +``` + +Expected: all listed tests pass. + +- [ ] **Step 2: Run import tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_imports.py -q +``` + +Expected: all import tests pass. + +- [ ] **Step 3: Scan for leaked service tokens in implementation** + +Run: + +```bash +cd app-instance/backend +rg -n "bridge-token|connector-token|token-1|token-2|secret-token" beaver || true +``` + +Expected: no implementation files contain fixture token values. + +- [ ] **Step 4: Commit verification-only fixes if needed** + +If Step 1 or Step 2 required a small fix, commit it: + +```bash +git add app-instance/backend +git commit -m "fix: stabilize external connector backend runtime" +``` + +If no files changed, do not create an empty commit. diff --git a/docs/superpowers/plans/2026-06-03-external-connector-frontend-deploy.md b/docs/superpowers/plans/2026-06-03-external-connector-frontend-deploy.md new file mode 100644 index 0000000..27fbb7d --- /dev/null +++ b/docs/superpowers/plans/2026-06-03-external-connector-frontend-deploy.md @@ -0,0 +1,790 @@ +# External Connector Frontend And Deploy Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a connector-driven onboarding UI for Weixin and Feishu/Lark, wire frontend API helpers to backend connector-session APIs, and verify the docker-compose sidecar deployment path. + +**Architecture:** The Status page keeps the existing advanced channel config editor, but adds a connector onboarding section backed by `/api/channel-connectors`, `/api/channel-connections`, and `/api/channel-connector-sessions`. Weixin shows QR status; Feishu/Lark shows provider instructions/status. Successful sessions become active without restart through backend dynamic runtime activation. + +**Tech Stack:** Next.js 13, React, TypeScript, existing shadcn/Radix UI components, lucide-react, Vitest, Docker Compose. + +--- + +## Dependencies + +Execute after: + +- `docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md` +- `docs/superpowers/plans/2026-06-03-external-connector-sidecar.md` + +## Scope + +Included: + +- Frontend TypeScript API helpers and types for connectors, connections, and connector sessions. +- Status page connector onboarding UI. +- QR/instruction modal and polling. +- Logout/revoke action using existing connection revoke API. +- Frontend tests for API mapping and UI state helpers. +- Docker compose smoke verification instructions for local sidecar. + +Excluded: + +- Replacing the advanced `/api/channels` static config editor. +- Live vendor account verification logic inside frontend. +- New top-level navigation route. + +## File Structure + +- Modify `app-instance/frontend/types/index.ts` + - Add connector and connector-session types. +- Modify `app-instance/frontend/lib/api.ts` + - Add connector API functions. +- Create `app-instance/frontend/lib/channel-connectors.ts` + - Small UI state helpers for connector labels/status. +- Create `app-instance/frontend/components/channel-connector-wizard.tsx` + - Connector cards, session modal, QR/instruction rendering, poll controls. +- Modify `app-instance/frontend/app/(app)/status/page.tsx` + - Fetch connector data and render wizard above advanced Channels list. +- Create `app-instance/frontend/lib/channel-connectors.test.ts` + - Helper tests. +- Create `app-instance/frontend/components/channel-connector-wizard.test.tsx` + - Component tests if the existing Vitest setup supports React Testing Library; otherwise keep helper tests and verify with typecheck/build. +- Review `docker-compose.external-connectors.yml` + - Confirm sidecar env names match backend and frontend assumptions. + +--- + +### Task 1: Frontend Types And API Client + +**Files:** +- Modify: `app-instance/frontend/types/index.ts` +- Modify: `app-instance/frontend/lib/api.ts` +- Test: `app-instance/frontend/lib/channel-connectors.test.ts` + +- [ ] **Step 1: Add frontend connector types** + +Append to `app-instance/frontend/types/index.ts`: + +```ts +export interface ChannelConnectorDescriptor { + kind: string; + displayName?: string; + display_name?: string; + authType?: string; + auth_type?: string; + providerId?: string; + provider_id?: string; + capabilities?: string[]; + available?: boolean; + unavailableReason?: string | null; +} + +export interface ChannelConnectionView { + connection_id: string; + owner_user_id?: string | null; + channel_id: string; + kind: string; + mode: string; + display_name: string; + account_id: string; + status: string; + auth_type: string; + runtime_config: Record; + capabilities: string[]; + created_at: string; + updated_at: string; + last_seen_at?: string | null; + last_error?: string | null; +} + +export interface ChannelConnectionResponse { + connection: ChannelConnectionView; + credentials?: Record; +} + +export interface ConnectorSessionView { + sessionId: string; + kind: string; + status: string; + qrCode?: string | null; + qrImage?: string | null; + instructions?: string[]; + accountId?: string | null; + displayName?: string | null; + error?: string | null; + metadata?: Record; +} + +export interface ConnectorSessionResponse { + session: ConnectorSessionView; + connection?: ChannelConnectionView | null; +} +``` + +- [ ] **Step 2: Add API imports** + +Modify the import list in `app-instance/frontend/lib/api.ts` to include: + +```ts + ChannelConnectionResponse, + ChannelConnectionView, + ChannelConnectorDescriptor, + ConnectorSessionResponse, +``` + +- [ ] **Step 3: Add connector API functions** + +Append to `app-instance/frontend/lib/api.ts` near the channel API functions: + +```ts +export async function listChannelConnectors(): Promise { + return fetchJSON('/api/channel-connectors'); +} + +export async function listChannelConnections(): Promise { + return fetchJSON('/api/channel-connections'); +} + +export async function startConnectorSession(params: { + kind: string; + displayName?: string; + ownerUserId?: string; + options?: Record; +}): Promise { + return fetchJSON('/api/channel-connector-sessions', { + method: 'POST', + timeoutMs: 45000, + body: JSON.stringify({ + kind: params.kind, + displayName: params.displayName, + ownerUserId: params.ownerUserId, + options: params.options || {}, + }), + }); +} + +export async function getConnectorSession(sessionId: string): Promise { + return fetchJSON(`/api/channel-connector-sessions/${encodeURIComponent(sessionId)}`, { + timeoutMs: 45000, + }); +} + +export async function revokeChannelConnection(connectionId: string): Promise { + return fetchJSON(`/api/channel-connections/${encodeURIComponent(connectionId)}/revoke`, { + method: 'POST', + }); +} +``` + +- [ ] **Step 4: Run frontend typecheck** + +Run: + +```bash +cd app-instance/frontend +npm run typecheck +``` + +Expected: typecheck passes. If it fails because these types are appended inside another interface, move them below the closing brace for `SystemStatus`. + +- [ ] **Step 5: Commit Task 1** + +```bash +git add app-instance/frontend/types/index.ts app-instance/frontend/lib/api.ts +git commit -m "feat: add connector frontend api client" +``` + +--- + +### Task 2: Connector UI Helpers + +**Files:** +- Create: `app-instance/frontend/lib/channel-connectors.ts` +- Create: `app-instance/frontend/lib/channel-connectors.test.ts` + +- [ ] **Step 1: Write helper tests** + +Create `app-instance/frontend/lib/channel-connectors.test.ts`: + +```ts +import { describe, expect, it } from 'vitest'; +import { + connectorDisplayName, + connectorStatusLabel, + isTerminalConnectorSessionStatus, +} from './channel-connectors'; + +describe('channel connector helpers', () => { + it('returns friendly connector names', () => { + expect(connectorDisplayName({ kind: 'weixin' })).toBe('Weixin'); + expect(connectorDisplayName({ kind: 'feishu' })).toBe('Feishu/Lark'); + expect(connectorDisplayName({ kind: 'telegram', displayName: 'Telegram' })).toBe('Telegram'); + }); + + it('maps connector session statuses', () => { + expect(connectorStatusLabel('qr_ready')).toBe('QR ready'); + expect(connectorStatusLabel('waiting_for_user')).toBe('Waiting for user'); + expect(connectorStatusLabel('connected')).toBe('Connected'); + }); + + it('detects terminal statuses', () => { + expect(isTerminalConnectorSessionStatus('connected')).toBe(true); + expect(isTerminalConnectorSessionStatus('expired')).toBe(true); + expect(isTerminalConnectorSessionStatus('qr_ready')).toBe(false); + }); +}); +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/frontend +npm run test -- lib/channel-connectors.test.ts +``` + +Expected: fail with `Cannot find module './channel-connectors'`. + +- [ ] **Step 3: Implement helpers** + +Create `app-instance/frontend/lib/channel-connectors.ts`: + +```ts +import type { ChannelConnectorDescriptor } from '@/types'; + +export function connectorDisplayName(connector: Pick): string { + if (connector.displayName) return connector.displayName; + if (connector.display_name) return connector.display_name; + if (connector.kind === 'weixin') return 'Weixin'; + if (connector.kind === 'feishu') return 'Feishu/Lark'; + if (connector.kind === 'telegram') return 'Telegram'; + return connector.kind; +} + +export function connectorStatusLabel(status: string): string { + const labels: Record = { + pending: 'Pending', + qr_ready: 'QR ready', + scanned: 'Scanned', + confirmed: 'Confirmed', + installing: 'Installing', + waiting_for_user: 'Waiting for user', + connected: 'Connected', + expired: 'Expired', + error: 'Error', + cancelled: 'Cancelled', + }; + return labels[status] || status; +} + +export function isTerminalConnectorSessionStatus(status: string): boolean { + return ['connected', 'expired', 'error', 'cancelled'].includes(status); +} +``` + +- [ ] **Step 4: Run helper tests** + +Run: + +```bash +cd app-instance/frontend +npm run test -- lib/channel-connectors.test.ts +``` + +Expected: helper tests pass. + +- [ ] **Step 5: Commit Task 2** + +```bash +git add app-instance/frontend/lib/channel-connectors.ts app-instance/frontend/lib/channel-connectors.test.ts +git commit -m "feat: add channel connector ui helpers" +``` + +--- + +### Task 3: Connector Wizard Component + +**Files:** +- Create: `app-instance/frontend/components/channel-connector-wizard.tsx` +- Modify: `app-instance/frontend/app/(app)/status/page.tsx` + +- [ ] **Step 1: Create wizard component** + +Create `app-instance/frontend/components/channel-connector-wizard.tsx`: + +```tsx +'use client'; + +import React, { useEffect, useMemo, useState } from 'react'; +import { CheckCircle2, Loader2, QrCode, RefreshCw, Unplug } from 'lucide-react'; +import type { + ChannelConnectionView, + ChannelConnectorDescriptor, + ConnectorSessionResponse, + ConnectorSessionView, +} from '@/types'; +import { + getConnectorSession, + revokeChannelConnection, + startConnectorSession, +} from '@/lib/api'; +import { + connectorDisplayName, + connectorStatusLabel, + isTerminalConnectorSessionStatus, +} from '@/lib/channel-connectors'; +import { Button } from '@/components/ui/button'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { + Dialog, + DialogContent, + DialogFooter, + DialogHeader, + DialogTitle, +} from '@/components/ui/dialog'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; + +type Props = { + connectors: ChannelConnectorDescriptor[]; + connections: ChannelConnectionView[]; + onChanged: () => Promise | void; +}; + +export function ChannelConnectorWizard({ connectors, connections, onChanged }: Props) { + const [activeKind, setActiveKind] = useState(null); + const [session, setSession] = useState(null); + const [connection, setConnection] = useState(null); + const [busy, setBusy] = useState(false); + const [error, setError] = useState(null); + const [feishuDomain, setFeishuDomain] = useState('feishu'); + + const visibleConnectors = useMemo( + () => connectors.filter((item) => ['telegram', 'weixin', 'feishu'].includes(item.kind)), + [connectors], + ); + + useEffect(() => { + if (!session || isTerminalConnectorSessionStatus(session.status)) return; + const timer = window.setInterval(async () => { + try { + const next = await getConnectorSession(session.sessionId); + setSession(next.session); + if (next.connection) setConnection(next.connection); + if (next.session.status === 'connected') await onChanged(); + } catch (err: any) { + setError(err.message || 'Failed to refresh connector session'); + } + }, 2000); + return () => window.clearInterval(timer); + }, [session?.sessionId, session?.status, onChanged]); + + const start = async (kind: string) => { + setActiveKind(kind); + setSession(null); + setConnection(null); + setError(null); + setBusy(true); + try { + const options = kind === 'feishu' ? { domain: feishuDomain } : {}; + const response: ConnectorSessionResponse = await startConnectorSession({ + kind, + displayName: connectorDisplayName({ kind }), + options, + }); + setSession(response.session); + setConnection(response.connection || null); + } catch (err: any) { + setError(err.message || 'Failed to start connector session'); + } finally { + setBusy(false); + } + }; + + const revoke = async (item: ChannelConnectionView) => { + setBusy(true); + setError(null); + try { + await revokeChannelConnection(item.connection_id); + await onChanged(); + } catch (err: any) { + setError(err.message || 'Failed to logout connector'); + } finally { + setBusy(false); + } + }; + + return ( +
+
+ {visibleConnectors.map((connector) => { + const existing = connections.find((item) => item.kind === connector.kind && item.status !== 'revoked'); + return ( + + + + {connectorDisplayName(connector)} + {existing ? {existing.status} : null} + + + + {connector.kind === 'feishu' ? ( +
+ + setFeishuDomain(event.target.value)} /> +
+ ) : null} + {existing ? ( +
+ {existing.display_name || existing.account_id || existing.channel_id} + +
+ ) : ( + + )} +
+
+ ); + })} +
+ + {error ?

{error}

: null} + + !open && setActiveKind(null)}> + + + {activeKind ? connectorDisplayName({ kind: activeKind }) : 'Connector'} + + {session ? ( +
+
+ + {connectorStatusLabel(session.status)} + + {session.status === 'connected' ? : } +
+ {session.qrImage ? ( + Connector QR code + ) : null} + {session.instructions && session.instructions.length > 0 ? ( +
+ {session.instructions.map((item) =>

{item}

)} +
+ ) : null} + {connection ?

{connection.display_name || connection.account_id}

: null} + {session.error ?

{session.error}

: null} +
+ ) : null} + + + +
+
+
+ ); +} +``` + +- [ ] **Step 2: Wire Status page imports** + +Modify imports in `app-instance/frontend/app/(app)/status/page.tsx`: + +```tsx +import { ChannelConnectorWizard } from '@/components/channel-connector-wizard'; +import { getChannelConfig, getStatus, listChannelConnections, listChannelConnectors, listChannelEvents, restartRuntime, updateAgentConfig, updateChannelConfig, updateProviderConfig } from '@/lib/api'; +import type { ChannelConfigDetail, ChannelConnectionView, ChannelConnectorDescriptor, ChannelEventRecord, ChannelStatus, ProviderStatus, SystemStatus } from '@/types'; +``` + +- [ ] **Step 3: Add connector state to Status page** + +Inside `StatusPage()` state declarations: + +```tsx + const [channelConnectors, setChannelConnectors] = useState([]); + const [channelConnections, setChannelConnections] = useState([]); +``` + +Add loader: + +```tsx + const loadChannelConnectors = async () => { + const [connectors, connections] = await Promise.all([ + listChannelConnectors(), + listChannelConnections(), + ]); + setChannelConnectors(connectors); + setChannelConnections(connections); + }; +``` + +Call it after status load: + +```tsx + useEffect(() => { + loadStatus(); + loadChannelConnectors().catch(() => undefined); + }, []); +``` + +In `handleSaveChannel()` after `await loadStatus();`, add: + +```tsx + await loadChannelConnectors(); +``` + +- [ ] **Step 4: Render wizard above advanced Channels list** + +In `app-instance/frontend/app/(app)/status/page.tsx`, render before the existing `{/* Channels */}` section: + +```tsx +
+
+

{pickAppText(locale, '连接器', 'Connectors')}

+

+ {pickAppText(locale, '连接微信或飞书后会立即进入运行时。', 'Connected Weixin or Feishu channels activate immediately.')} +

+
+ { + await loadChannelConnectors(); + await loadStatus(); + }} + /> +
+``` + +- [ ] **Step 5: Run frontend checks** + +Run: + +```bash +cd app-instance/frontend +npm run typecheck +npm run test -- lib/channel-connectors.test.ts +``` + +Expected: typecheck and helper tests pass. + +- [ ] **Step 6: Commit Task 3** + +```bash +git add app-instance/frontend/components/channel-connector-wizard.tsx app-instance/frontend/app/'(app)'/status/page.tsx +git commit -m "feat: add channel connector wizard" +``` + +--- + +### Task 4: Frontend Build And Browser Smoke + +**Files:** +- Review: `app-instance/frontend/app/(app)/status/page.tsx` +- Review: `app-instance/frontend/components/channel-connector-wizard.tsx` + +- [ ] **Step 1: Run frontend build** + +Run: + +```bash +cd app-instance/frontend +npm run build +``` + +Expected: Next build succeeds. + +- [ ] **Step 2: Start frontend dev server if visual smoke is needed** + +Run: + +```bash +cd app-instance/frontend +npm run dev +``` + +Expected: dev server listens on `http://127.0.0.1:3080`. + +- [ ] **Step 3: Browser smoke check** + +Open the Status page in the running app instance and verify: + +- The Connectors section appears above Channels. +- Telegram shows token setup disabled in the connector wizard. +- Weixin has a Connect button. +- Feishu/Lark has a Domain input and Connect button. +- Starting a fake Weixin session opens a modal with a QR image. + +- [ ] **Step 4: Stop frontend dev server** + +If Step 2 started a dev server, stop it with `Ctrl-C`. + +- [ ] **Step 5: Commit fixes if needed** + +If build or smoke required fixes: + +```bash +git add app-instance/frontend +git commit -m "fix: stabilize channel connector wizard" +``` + +If no files changed, do not create an empty commit. + +--- + +### Task 5: Compose Integration Verification + +**Files:** +- Review: `docker-compose.external-connectors.yml` +- Review: `.env.example` + +- [ ] **Step 1: Build backend and sidecar images** + +Run: + +```bash +docker build -t beaver/app-instance:latest app-instance +docker compose -f docker-compose.external-connectors.yml build external-connector +``` + +Expected: both builds succeed. + +- [ ] **Step 2: Start sidecar with fake provider** + +Run: + +```bash +CONNECTOR_PROVIDER=fake \ +EXTERNAL_CONNECTOR_TOKEN=dev-token \ +BEAVER_BRIDGE_TOKEN=dev-token \ +docker compose -f docker-compose.external-connectors.yml up -d external-connector +``` + +Expected: `external-connector` starts and stays running. + +- [ ] **Step 3: Verify sidecar connector API** + +Run: + +```bash +curl -sS -H 'Authorization: Bearer dev-token' http://127.0.0.1:8787/connectors +``` + +Expected: JSON contains `weixin` and `feishu`. + +- [ ] **Step 4: Attach sidecar to Beaver instance network** + +For a local `create-instance.sh` deployment using `beaver-instance-edge`, run: + +```bash +docker network connect beaver-instance-edge external-connector 2>/dev/null || true +``` + +Expected: command succeeds or reports that the endpoint already exists. + +- [ ] **Step 5: Restart target app instance with connector env** + +For `terminaltest`, ensure the app container has: + +```dotenv +EXTERNAL_CONNECTOR_BASE_URL=http://external-connector:8787 +EXTERNAL_CONNECTOR_TOKEN=dev-token +BEAVER_BRIDGE_TOKEN=dev-token +``` + +Then recreate the instance with the deployment script used by this repo. Do not mount `/var/run/docker.sock` into Beaver. + +- [ ] **Step 6: Manual fake-provider onboarding** + +In `terminaltest`: + +- Open Status. +- Click Weixin Connect. +- Confirm QR modal appears. +- Poll until fake status remains visible. +- Confirm backend `/api/channel-connectors` returns `telegram`, `weixin`, and `feishu`. + +- [ ] **Step 7: Stop fake sidecar if no longer needed** + +Run: + +```bash +docker compose -f docker-compose.external-connectors.yml down +``` + +Expected: sidecar stops; named volume remains. + +--- + +### Task 6: Final Frontend And Deploy Verification + +**Files:** +- Review: `docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md` + +- [ ] **Step 1: Run frontend verification** + +Run: + +```bash +cd app-instance/frontend +npm run typecheck +npm run build +npm run test -- lib/channel-connectors.test.ts +``` + +Expected: all commands pass. + +- [ ] **Step 2: Run backend connector smoke tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest \ + tests/unit/test_external_sidecar_connectors.py \ + tests/unit/test_external_connector_bridge_api.py \ + tests/unit/test_channel_runtime_dynamic_channels.py \ + -q +``` + +Expected: all listed tests pass. + +- [ ] **Step 3: Run sidecar verification** + +Run: + +```bash +cd external-connector +uv run pytest -q +``` + +Expected: all sidecar tests pass. + +- [ ] **Step 4: Scan for provider-runtime naming in new files** + +Run: + +```bash +rg -n "[Oo]pen[Cc]law" docs/superpowers app-instance/frontend external-connector docker-compose.external-connectors.yml || true +``` + +Expected: no matches. + +- [ ] **Step 5: Commit verification fixes if needed** + +If any verification step required fixes: + +```bash +git add app-instance/frontend external-connector docker-compose.external-connectors.yml docs/superpowers +git commit -m "fix: stabilize external connector onboarding" +``` + +If no files changed, do not create an empty commit. diff --git a/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md b/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md new file mode 100644 index 0000000..b33b5c3 --- /dev/null +++ b/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md @@ -0,0 +1,1167 @@ +# External Connector Sidecar Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a repo-local `external-connector` sidecar service with a provider abstraction, deterministic fake provider tests, service-level auth, outbound send idempotency, and a production provider that shells out to real vendor CLI commands supplied by environment variables. + +**Architecture:** The sidecar exposes a stable HTTP contract to Beaver and delegates platform-specific behavior to `ConnectorProvider`. The fake provider makes tests deterministic; the production provider runs configured vendor commands and persists connector/session/send state under `CONNECTOR_HOME`. + +**Tech Stack:** Python 3.12, FastAPI, Pydantic v2, pytest, httpx, local JSON stores, Docker. + +--- + +## Scope + +Included: + +- `external-connector/` Python service. +- `ConnectorProvider` protocol. +- Fake provider for tests and local dry runs. +- Production `VendorCliProvider` with environment-driven command templates. +- Service-level bearer authentication for Beaver-to-sidecar requests. +- Connector session state persistence. +- `/send` idempotency by `connectionId + requestId`. +- Dockerfile and local compose declaration. + +Excluded: + +- Beaver backend bridge implementation. +- Frontend UI. +- Hardcoded vendor command strings in repo files. +- Docker socket access. +- Dynamic container creation. + +## File Structure + +- Create `external-connector/pyproject.toml` + - Sidecar dependencies and test runner. +- Create `external-connector/Dockerfile` + - Python runtime plus Node/npm so configured vendor CLI commands can run. +- Create `external-connector/external_connector/__init__.py` +- Create `external-connector/external_connector/models.py` + - Pydantic request/response models. +- Create `external-connector/external_connector/state.py` + - JSON-backed session and send idempotency state. +- Create `external-connector/external_connector/providers/base.py` + - `ConnectorProvider` protocol. +- Create `external-connector/external_connector/providers/fake.py` + - Deterministic provider for tests. +- Create `external-connector/external_connector/providers/vendor_cli.py` + - Command-template provider. +- Create `external-connector/external_connector/app.py` + - FastAPI app factory and routes. +- Create `external-connector/external_connector/main.py` + - Uvicorn entrypoint. +- Create `external-connector/tests/test_sidecar_api.py` +- Create `external-connector/tests/test_state.py` +- Create `external-connector/tests/test_vendor_cli_provider.py` +- Create `docker-compose.external-connectors.yml` +- Modify `.env.example` + - Document sidecar env variables without embedding real secrets. + +--- + +### Task 1: Sidecar State Store + +**Files:** +- Create: `external-connector/external_connector/state.py` +- Create: `external-connector/external_connector/__init__.py` +- Create: `external-connector/tests/test_state.py` +- Create: `external-connector/pyproject.toml` + +- [ ] **Step 1: Create sidecar package metadata** + +Create `external-connector/pyproject.toml`: + +```toml +[project] +name = "external-connector" +version = "0.1.0" +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.115.0,<1.0", + "httpx>=0.27.0,<1.0", + "pydantic>=2.7.0,<3.0", + "uvicorn[standard]>=0.30.0,<1.0", +] + +[dependency-groups] +dev = [ + "pytest>=8.0.0,<9.0", +] + +[tool.pytest.ini_options] +pythonpath = ["."] +testpaths = ["tests"] +``` + +Create `external-connector/external_connector/__init__.py`: + +```python +"""Generic external connector sidecar.""" +``` + +- [ ] **Step 2: Write failing state tests** + +Create `external-connector/tests/test_state.py`: + +```python +from __future__ import annotations + +from external_connector.state import SidecarStateStore + + +def test_state_store_saves_and_loads_connector_sessions(tmp_path) -> None: + store = SidecarStateStore(tmp_path / "state.json") + + session = store.create_session( + kind="weixin", + connection_id="conn_1", + channel_id="weixin-main", + display_name="Weixin Main", + options={}, + ) + store.update_session(session.session_id, status="connected", account_id="weixin:me", display_name="Me") + loaded = store.get_session(session.session_id) + + assert session.session_id.startswith("cs_") + assert loaded.status == "connected" + assert loaded.account_id == "weixin:me" + + +def test_state_store_dedupes_send_results(tmp_path) -> None: + store = SidecarStateStore(tmp_path / "state.json") + + first = store.begin_send(connection_id="conn_1", request_id="out_1") + store.complete_send(first.dedupe_key, provider_message_id="provider-1") + duplicate = store.begin_send(connection_id="conn_1", request_id="out_1") + + assert first.should_send is True + assert duplicate.should_send is False + assert duplicate.provider_message_id == "provider-1" +``` + +- [ ] **Step 3: Run tests to verify failure** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_state.py -q +``` + +Expected: fail with `ModuleNotFoundError: No module named 'external_connector.state'`. + +- [ ] **Step 4: Implement state store** + +Create `external-connector/external_connector/state.py`: + +```python +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from threading import Lock +from typing import Any +from uuid import uuid4 + + +def iso_now() -> str: + return datetime.now(timezone.utc).isoformat() + + +@dataclass(slots=True) +class ConnectorSessionState: + session_id: str + kind: str + connection_id: str + channel_id: str + display_name: str + status: str + options: dict[str, Any] = field(default_factory=dict) + qr_code: str | None = None + qr_image: str | None = None + instructions: list[str] = field(default_factory=list) + account_id: str | None = None + error: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + created_at: str = field(default_factory=iso_now) + updated_at: str = field(default_factory=iso_now) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "ConnectorSessionState": + return cls( + session_id=str(data.get("session_id") or ""), + kind=str(data.get("kind") or ""), + connection_id=str(data.get("connection_id") or ""), + channel_id=str(data.get("channel_id") or ""), + display_name=str(data.get("display_name") or ""), + status=str(data.get("status") or "pending"), + options=dict(data.get("options") or {}), + qr_code=str(data["qr_code"]) if data.get("qr_code") is not None else None, + qr_image=str(data["qr_image"]) if data.get("qr_image") is not None else None, + instructions=[str(item) for item in data.get("instructions") or []], + account_id=str(data["account_id"]) if data.get("account_id") is not None else None, + error=str(data["error"]) if data.get("error") is not None else None, + metadata=dict(data.get("metadata") or {}), + created_at=str(data.get("created_at") or iso_now()), + updated_at=str(data.get("updated_at") or iso_now()), + ) + + +@dataclass(slots=True) +class SendBeginResult: + should_send: bool + dedupe_key: str + provider_message_id: str | None = None + + +class SidecarStateStore: + def __init__(self, path: Path) -> None: + self.path = Path(path) + self._lock = Lock() + + def create_session( + self, + *, + kind: str, + connection_id: str, + channel_id: str, + display_name: str, + options: dict[str, Any], + ) -> ConnectorSessionState: + session = ConnectorSessionState( + session_id=f"cs_{uuid4().hex}", + kind=kind, + connection_id=connection_id, + channel_id=channel_id, + display_name=display_name, + status="pending", + options=dict(options), + ) + with self._lock: + data = self._load() + data["sessions"][session.session_id] = session.to_dict() + self._save(data) + return session + + def get_session(self, session_id: str) -> ConnectorSessionState: + data = self._load() + raw = data["sessions"].get(session_id) + if not isinstance(raw, dict): + raise KeyError(session_id) + return ConnectorSessionState.from_dict(raw) + + def update_session(self, session_id: str, **updates: Any) -> ConnectorSessionState: + with self._lock: + data = self._load() + raw = data["sessions"].get(session_id) + if not isinstance(raw, dict): + raise KeyError(session_id) + session = ConnectorSessionState.from_dict(raw) + for key, value in updates.items(): + if hasattr(session, key): + setattr(session, key, value) + session.updated_at = iso_now() + data["sessions"][session_id] = session.to_dict() + self._save(data) + return session + + def begin_send(self, *, connection_id: str, request_id: str) -> SendBeginResult: + dedupe_key = f"{connection_id}:{request_id}" + with self._lock: + data = self._load() + existing = data["sends"].get(dedupe_key) + if isinstance(existing, dict) and existing.get("status") == "completed": + return SendBeginResult(False, dedupe_key, str(existing.get("provider_message_id") or "")) + data["sends"][dedupe_key] = { + "connection_id": connection_id, + "request_id": request_id, + "status": "processing", + "updated_at": iso_now(), + } + self._save(data) + return SendBeginResult(True, dedupe_key) + + def complete_send(self, dedupe_key: str, *, provider_message_id: str | None) -> None: + with self._lock: + data = self._load() + item = dict(data["sends"].get(dedupe_key) or {}) + item.update({"status": "completed", "provider_message_id": provider_message_id, "updated_at": iso_now()}) + data["sends"][dedupe_key] = item + self._save(data) + + def _load(self) -> dict[str, Any]: + if not self.path.exists(): + return {"sessions": {}, "sends": {}} + try: + data = json.loads(self.path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {"sessions": {}, "sends": {}} + if not isinstance(data, dict): + return {"sessions": {}, "sends": {}} + if not isinstance(data.get("sessions"), dict): + data["sessions"] = {} + if not isinstance(data.get("sends"), dict): + data["sends"] = {} + return data + + def _save(self, data: dict[str, Any]) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = self.path.with_name(f"{self.path.name}.tmp") + tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + tmp_path.replace(self.path) +``` + +- [ ] **Step 5: Run state tests** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_state.py -q +``` + +Expected: `2 passed`. + +- [ ] **Step 6: Commit Task 1** + +```bash +git add external-connector +git commit -m "feat: add external connector sidecar state" +``` + +--- + +### Task 2: Provider Contract And Fake Provider + +**Files:** +- Create: `external-connector/external_connector/models.py` +- Create: `external-connector/external_connector/providers/base.py` +- Create: `external-connector/external_connector/providers/fake.py` +- Test: `external-connector/tests/test_sidecar_api.py` + +- [ ] **Step 1: Write failing fake provider tests** + +Create `external-connector/tests/test_sidecar_api.py`: + +```python +from __future__ import annotations + +from external_connector.providers.fake import FakeProvider +from external_connector.state import SidecarStateStore + + +def test_fake_provider_lists_weixin_and_feishu(tmp_path) -> None: + provider = FakeProvider(SidecarStateStore(tmp_path / "state.json")) + + connectors = provider.connectors() + + assert [item["kind"] for item in connectors] == ["weixin", "feishu"] + assert connectors[0]["authType"] == "qr" + + +def test_fake_provider_session_flow(tmp_path) -> None: + provider = FakeProvider(SidecarStateStore(tmp_path / "state.json")) + + session = provider.start_session( + { + "kind": "weixin", + "connectionId": "conn_1", + "channelId": "weixin-main", + "displayName": "Weixin Main", + "callbackBaseUrl": "http://beaver:8080", + "options": {}, + } + ) + loaded = provider.get_session(session["sessionId"]) + + assert session["status"] == "qr_ready" + assert session["qrImage"].startswith("data:image/png;base64,") + assert loaded["sessionId"] == session["sessionId"] + + +def test_fake_provider_send_returns_idempotent_result(tmp_path) -> None: + provider = FakeProvider(SidecarStateStore(tmp_path / "state.json")) + payload = { + "requestId": "out_1", + "connectionId": "conn_1", + "channelId": "weixin-main", + "kind": "weixin", + "target": {"peerId": "peer-1", "peerType": "dm", "threadId": None}, + "content": "hello", + "metadata": {}, + } + + first = provider.send(payload) + second = provider.send(payload) + + assert first == second + assert first["ok"] is True +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_sidecar_api.py -q +``` + +Expected: fail with `ModuleNotFoundError: No module named 'external_connector.providers'`. + +- [ ] **Step 3: Add Pydantic models** + +Create `external-connector/external_connector/models.py`: + +```python +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + + +class ConnectorSessionRequest(BaseModel): + kind: str + connection_id: str = Field(alias="connectionId") + channel_id: str = Field(alias="channelId") + display_name: str = Field(alias="displayName") + callback_base_url: str = Field(alias="callbackBaseUrl") + options: dict[str, Any] = Field(default_factory=dict) + + +class SendRequest(BaseModel): + request_id: str = Field(alias="requestId") + connection_id: str = Field(alias="connectionId") + channel_id: str = Field(alias="channelId") + kind: str + target: dict[str, Any] + content: str + metadata: dict[str, Any] = Field(default_factory=dict) +``` + +- [ ] **Step 4: Add provider contract** + +Create `external-connector/external_connector/providers/base.py`: + +```python +from __future__ import annotations + +from typing import Any, Protocol + + +class ConnectorProvider(Protocol): + provider_id: str + + def connectors(self) -> list[dict[str, Any]]: + ... + + def health(self) -> dict[str, Any]: + ... + + def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: + ... + + def get_session(self, session_id: str) -> dict[str, Any]: + ... + + def cancel_session(self, session_id: str) -> None: + ... + + def logout(self, connection_id: str) -> None: + ... + + def send(self, payload: dict[str, Any]) -> dict[str, Any]: + ... +``` + +- [ ] **Step 5: Add fake provider** + +Create `external-connector/external_connector/providers/fake.py`: + +```python +from __future__ import annotations + +from typing import Any +from uuid import uuid4 + +from external_connector.state import ConnectorSessionState, SidecarStateStore + + +def _session_view(session: ConnectorSessionState) -> dict[str, Any]: + return { + "sessionId": session.session_id, + "kind": session.kind, + "status": session.status, + "qrCode": session.qr_code, + "qrImage": session.qr_image, + "instructions": list(session.instructions), + "accountId": session.account_id, + "displayName": session.display_name if session.account_id else None, + "error": session.error, + "metadata": dict(session.metadata), + } + + +class FakeProvider: + provider_id = "fake" + + def __init__(self, store: SidecarStateStore) -> None: + self.store = store + + def connectors(self) -> list[dict[str, Any]]: + return [ + { + "kind": "weixin", + "displayName": "Weixin", + "authType": "qr", + "providerId": self.provider_id, + "capabilities": ["receive_text", "send_text", "receive_media", "direct_messages"], + }, + { + "kind": "feishu", + "displayName": "Feishu/Lark", + "authType": "plugin_install", + "providerId": self.provider_id, + "capabilities": ["receive_text", "send_text", "receive_media", "groups"], + }, + ] + + def health(self) -> dict[str, Any]: + return {"ok": True, "providerId": self.provider_id} + + def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: + session = self.store.create_session( + kind=str(payload["kind"]), + connection_id=str(payload["connectionId"]), + channel_id=str(payload["channelId"]), + display_name=str(payload["displayName"]), + options=dict(payload.get("options") or {}), + ) + session = self.store.update_session( + session.session_id, + status="qr_ready" if session.kind == "weixin" else "waiting_for_user", + qr_image="data:image/png;base64,ZmFrZQ==" if session.kind == "weixin" else None, + instructions=["Run the provider install flow and finish verification"] if session.kind == "feishu" else [], + ) + return _session_view(session) + + def get_session(self, session_id: str) -> dict[str, Any]: + return _session_view(self.store.get_session(session_id)) + + def cancel_session(self, session_id: str) -> None: + self.store.update_session(session_id, status="cancelled") + + def logout(self, connection_id: str) -> None: + return None + + def send(self, payload: dict[str, Any]) -> dict[str, Any]: + begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"])) + if not begin.should_send: + return {"ok": True, "providerMessageId": begin.provider_message_id} + provider_message_id = f"fake_{uuid4().hex}" + self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id) + return {"ok": True, "providerMessageId": provider_message_id} +``` + +- [ ] **Step 6: Run fake provider tests** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_sidecar_api.py tests/test_state.py -q +``` + +Expected: all listed tests pass. + +- [ ] **Step 7: Commit Task 2** + +```bash +git add external-connector +git commit -m "feat: add external connector provider contract" +``` + +--- + +### Task 3: FastAPI Sidecar HTTP API + +**Files:** +- Create: `external-connector/external_connector/app.py` +- Create: `external-connector/external_connector/main.py` +- Modify: `external-connector/tests/test_sidecar_api.py` + +- [ ] **Step 1: Extend HTTP API tests** + +Append to `external-connector/tests/test_sidecar_api.py`: + +```python +from fastapi.testclient import TestClient + +from external_connector.app import create_app + + +def test_sidecar_http_api_requires_bearer_token(tmp_path) -> None: + app = create_app(provider=FakeProvider(SidecarStateStore(tmp_path / "state.json")), api_token="sidecar-token") + + with TestClient(app) as client: + response = client.get("/connectors") + + assert response.status_code == 401 + + +def test_sidecar_http_api_session_and_send(tmp_path) -> None: + app = create_app(provider=FakeProvider(SidecarStateStore(tmp_path / "state.json")), api_token="sidecar-token") + headers = {"Authorization": "Bearer sidecar-token"} + + with TestClient(app) as client: + connectors = client.get("/connectors", headers=headers) + session = client.post( + "/connector-sessions", + headers=headers, + json={ + "kind": "weixin", + "connectionId": "conn_1", + "channelId": "weixin-main", + "displayName": "Weixin Main", + "callbackBaseUrl": "http://beaver:8080", + "options": {}, + }, + ) + session_id = session.json()["sessionId"] + loaded = client.get(f"/connector-sessions/{session_id}", headers=headers) + sent = client.post( + "/send", + headers=headers, + json={ + "requestId": "out_1", + "connectionId": "conn_1", + "channelId": "weixin-main", + "kind": "weixin", + "target": {"peerId": "peer-1", "peerType": "dm", "threadId": None}, + "content": "hello", + "metadata": {}, + }, + ) + + assert connectors.status_code == 200 + assert session.status_code == 200 + assert loaded.json()["sessionId"] == session_id + assert sent.json()["ok"] is True +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_sidecar_api.py -q +``` + +Expected: fail with `ModuleNotFoundError: No module named 'external_connector.app'`. + +- [ ] **Step 3: Implement FastAPI app** + +Create `external-connector/external_connector/app.py`: + +```python +from __future__ import annotations + +from typing import Any + +from fastapi import FastAPI, Header, HTTPException + +from external_connector.models import ConnectorSessionRequest, SendRequest +from external_connector.providers.base import ConnectorProvider + + +def create_app(*, provider: ConnectorProvider, api_token: str) -> FastAPI: + app = FastAPI(title="External Connector") + + def require_auth(authorization: str | None) -> None: + if api_token and authorization != f"Bearer {api_token}": + raise HTTPException(status_code=401, detail="Invalid connector token") + + @app.get("/health") + def health() -> dict[str, Any]: + return provider.health() + + @app.get("/connectors") + def connectors(authorization: str | None = Header(default=None)) -> list[dict[str, Any]]: + require_auth(authorization) + return provider.connectors() + + @app.post("/connector-sessions") + def start_session(payload: ConnectorSessionRequest, authorization: str | None = Header(default=None)) -> dict[str, Any]: + require_auth(authorization) + return provider.start_session(payload.model_dump(by_alias=True)) + + @app.get("/connector-sessions/{session_id}") + def get_session(session_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]: + require_auth(authorization) + try: + return provider.get_session(session_id) + except KeyError: + raise HTTPException(status_code=404, detail="Connector session not found") + + @app.post("/connector-sessions/{session_id}/cancel") + def cancel_session(session_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]: + require_auth(authorization) + provider.cancel_session(session_id) + return {"ok": True} + + @app.post("/connections/{connection_id}/logout") + def logout(connection_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]: + require_auth(authorization) + provider.logout(connection_id) + return {"ok": True} + + @app.post("/send") + def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> dict[str, Any]: + require_auth(authorization) + return provider.send(payload.model_dump(by_alias=True)) + + return app +``` + +Create `external-connector/external_connector/main.py`: + +```python +from __future__ import annotations + +import os +from pathlib import Path + +import uvicorn + +from external_connector.app import create_app +from external_connector.providers.fake import FakeProvider +from external_connector.providers.vendor_cli import VendorCliProvider +from external_connector.state import SidecarStateStore + + +def build_app(): + home = Path(os.getenv("CONNECTOR_HOME", "/var/lib/external-connector")) + store = SidecarStateStore(home / "state.json") + provider_name = os.getenv("CONNECTOR_PROVIDER", "fake") + if provider_name == "vendor_cli": + provider = VendorCliProvider(store=store, env=os.environ) + else: + provider = FakeProvider(store) + return create_app(provider=provider, api_token=os.getenv("CONNECTOR_API_TOKEN", "")) + + +app = build_app() + + +def main() -> None: + uvicorn.run("external_connector.main:app", host="0.0.0.0", port=8787) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 4: Run API tests** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_sidecar_api.py -q +``` + +Expected: all HTTP API tests pass except import of `VendorCliProvider`, which is added in Task 4. If `main.py` import breaks before Task 4, add a minimal `external-connector/external_connector/providers/vendor_cli.py` containing a `VendorCliProvider` class that raises `RuntimeError("VendorCliProvider is not configured")` from each method. + +- [ ] **Step 5: Commit Task 3** + +```bash +git add external-connector +git commit -m "feat: add external connector sidecar api" +``` + +--- + +### Task 4: Vendor CLI Provider + +**Files:** +- Create: `external-connector/external_connector/providers/vendor_cli.py` +- Test: `external-connector/tests/test_vendor_cli_provider.py` + +- [ ] **Step 1: Write failing vendor provider tests** + +Create `external-connector/tests/test_vendor_cli_provider.py`: + +```python +from __future__ import annotations + +from external_connector.providers.vendor_cli import VendorCliProvider +from external_connector.state import SidecarStateStore + + +class FakeRunner: + def __init__(self) -> None: + self.commands: list[list[str]] = [] + + def __call__(self, command: list[str], cwd: str) -> tuple[int, str, str]: + self.commands.append(command) + return 0, "connected account=weixin:me", "" + + +def test_vendor_cli_provider_uses_env_command_templates(tmp_path) -> None: + runner = FakeRunner() + provider = VendorCliProvider( + store=SidecarStateStore(tmp_path / "state.json"), + env={"WEIXIN_CONNECT_COMMAND": "vendor-weixin install --state {state_dir}"}, + runner=runner, + ) + + session = provider.start_session( + { + "kind": "weixin", + "connectionId": "conn_1", + "channelId": "weixin-main", + "displayName": "Weixin Main", + "callbackBaseUrl": "http://beaver:8080", + "options": {}, + } + ) + + assert session["status"] in {"waiting_for_user", "connected"} + assert runner.commands[0][0] == "vendor-weixin" + + +def test_vendor_cli_provider_redacts_sensitive_error(tmp_path) -> None: + def runner(command: list[str], cwd: str) -> tuple[int, str, str]: + return 1, "", "failed secret-token appSecret=abc" + + provider = VendorCliProvider( + store=SidecarStateStore(tmp_path / "state.json"), + env={"FEISHU_CONNECT_COMMAND": "vendor-feishu install --secret abc"}, + runner=runner, + ) + + session = provider.start_session( + { + "kind": "feishu", + "connectionId": "conn_1", + "channelId": "feishu-main", + "displayName": "Feishu Main", + "callbackBaseUrl": "http://beaver:8080", + "options": {}, + } + ) + + assert session["status"] == "error" + assert "secret-token" not in (session["error"] or "") + assert "appSecret=abc" not in (session["error"] or "") +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_vendor_cli_provider.py -q +``` + +Expected: fail with `ModuleNotFoundError` or missing `VendorCliProvider`. + +- [ ] **Step 3: Implement vendor CLI provider** + +Create `external-connector/external_connector/providers/vendor_cli.py`: + +```python +from __future__ import annotations + +import os +import shlex +import subprocess +from collections.abc import Callable, Mapping +from pathlib import Path +from typing import Any + +from external_connector.providers.fake import _session_view +from external_connector.state import SidecarStateStore + + +Runner = Callable[[list[str], str], tuple[int, str, str]] + + +def default_runner(command: list[str], cwd: str) -> tuple[int, str, str]: + completed = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False) + return completed.returncode, completed.stdout, completed.stderr + + +class VendorCliProvider: + provider_id = "vendor_cli" + + def __init__( + self, + *, + store: SidecarStateStore, + env: Mapping[str, str] | None = None, + runner: Runner = default_runner, + ) -> None: + self.store = store + self.env = env or os.environ + self.runner = runner + + def connectors(self) -> list[dict[str, Any]]: + return [ + {"kind": "weixin", "displayName": "Weixin", "authType": "qr", "providerId": self.provider_id, "capabilities": ["receive_text", "send_text", "receive_media", "direct_messages"]}, + {"kind": "feishu", "displayName": "Feishu/Lark", "authType": "plugin_install", "providerId": self.provider_id, "capabilities": ["receive_text", "send_text", "receive_media", "groups"]}, + ] + + def health(self) -> dict[str, Any]: + return {"ok": True, "providerId": self.provider_id} + + def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: + kind = str(payload["kind"]) + session = self.store.create_session( + kind=kind, + connection_id=str(payload["connectionId"]), + channel_id=str(payload["channelId"]), + display_name=str(payload["displayName"]), + options=dict(payload.get("options") or {}), + ) + command_template = self._command_template(kind) + state_dir = str(Path(self.store.path).parent / kind / session.connection_id) + command = shlex.split(command_template.format(state_dir=state_dir, connection_id=session.connection_id)) + code, stdout, stderr = self.runner(command, state_dir) + if code != 0: + session = self.store.update_session(session.session_id, status="error", error=_redact(stderr or stdout)) + return _session_view(session) + status = "connected" if "connected" in stdout.lower() else "waiting_for_user" + account_id = _extract_account_id(stdout) + session = self.store.update_session( + session.session_id, + status=status, + account_id=account_id, + metadata={"stateRef": state_dir}, + instructions=["Complete the vendor install or verification flow"] if status != "connected" else [], + ) + return _session_view(session) + + def get_session(self, session_id: str) -> dict[str, Any]: + return _session_view(self.store.get_session(session_id)) + + def cancel_session(self, session_id: str) -> None: + self.store.update_session(session_id, status="cancelled") + + def logout(self, connection_id: str) -> None: + return None + + def send(self, payload: dict[str, Any]) -> dict[str, Any]: + begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"])) + if not begin.should_send: + return {"ok": True, "providerMessageId": begin.provider_message_id} + provider_message_id = f"vendor_{payload['requestId']}" + self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id) + return {"ok": True, "providerMessageId": provider_message_id} + + def _command_template(self, kind: str) -> str: + key = "WEIXIN_CONNECT_COMMAND" if kind == "weixin" else "FEISHU_CONNECT_COMMAND" + command = str(self.env.get(key) or "").strip() + if not command: + raise RuntimeError(f"{key} is required") + return command + + +def _extract_account_id(output: str) -> str | None: + for part in output.split(): + if part.startswith("account="): + return part.split("=", 1)[1] + return None + + +def _redact(text: str) -> str: + redacted = text.replace("secret-token", "***") + for marker in ("appSecret=", "token=", "secret="): + while marker in redacted: + start = redacted.index(marker) + len(marker) + end = start + while end < len(redacted) and not redacted[end].isspace(): + end += 1 + redacted = redacted[:start] + "***" + redacted[end:] + return redacted +``` + +- [ ] **Step 4: Run provider tests** + +Run: + +```bash +cd external-connector +uv run pytest tests/test_vendor_cli_provider.py tests/test_sidecar_api.py tests/test_state.py -q +``` + +Expected: all sidecar tests pass. + +- [ ] **Step 5: Commit Task 4** + +```bash +git add external-connector +git commit -m "feat: add vendor cli connector provider" +``` + +--- + +### Task 5: Docker And Compose + +**Files:** +- Create: `external-connector/Dockerfile` +- Create: `docker-compose.external-connectors.yml` +- Modify: `.env.example` + +- [ ] **Step 1: Create Dockerfile** + +Create `external-connector/Dockerfile`: + +```dockerfile +FROM python:3.12-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends nodejs npm ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY pyproject.toml ./ +COPY external_connector ./external_connector +RUN pip install --no-cache-dir uv \ + && uv pip install --system . + +ENV CONNECTOR_HOME=/var/lib/external-connector +EXPOSE 8787 + +CMD ["python", "-m", "external_connector.main"] +``` + +- [ ] **Step 2: Create compose file** + +Create `docker-compose.external-connectors.yml`: + +```yaml +services: + external-connector: + build: ./external-connector + restart: unless-stopped + environment: + BEAVER_BRIDGE_BASE_URL: ${BEAVER_BRIDGE_BASE_URL:-http://app-instance:8080} + BEAVER_BRIDGE_TOKEN: ${BEAVER_BRIDGE_TOKEN} + CONNECTOR_API_TOKEN: ${EXTERNAL_CONNECTOR_TOKEN} + CONNECTOR_HOME: /var/lib/external-connector + CONNECTOR_PROVIDER: ${CONNECTOR_PROVIDER:-vendor_cli} + WEIXIN_CONNECT_COMMAND: ${WEIXIN_CONNECT_COMMAND:-} + FEISHU_CONNECT_COMMAND: ${FEISHU_CONNECT_COMMAND:-} + volumes: + - external-connector-state:/var/lib/external-connector + ports: + - "${EXTERNAL_CONNECTOR_PORT:-8787}:8787" + +volumes: + external-connector-state: +``` + +- [ ] **Step 3: Update env example** + +Append to `.env.example`: + +```dotenv +# External connector sidecar +EXTERNAL_CONNECTOR_TOKEN= +BEAVER_BRIDGE_TOKEN= +BEAVER_BRIDGE_BASE_URL=http://app-instance:8080 +EXTERNAL_CONNECTOR_PORT=8787 +CONNECTOR_PROVIDER=vendor_cli +WEIXIN_CONNECT_COMMAND= +FEISHU_CONNECT_COMMAND= +``` + +- [ ] **Step 4: Build sidecar image** + +Run: + +```bash +docker compose -f docker-compose.external-connectors.yml build external-connector +``` + +Expected: build succeeds. + +- [ ] **Step 5: Run sidecar API smoke test with fake provider** + +Run: + +```bash +CONNECTOR_PROVIDER=fake EXTERNAL_CONNECTOR_TOKEN=dev-token BEAVER_BRIDGE_TOKEN=dev-token \ +docker compose -f docker-compose.external-connectors.yml up -d external-connector +curl -sS -H 'Authorization: Bearer dev-token' http://localhost:8787/connectors +docker compose -f docker-compose.external-connectors.yml down +``` + +Expected: curl output contains both `"kind":"weixin"` and `"kind":"feishu"`. + +- [ ] **Step 6: Commit Task 5** + +```bash +git add external-connector/Dockerfile docker-compose.external-connectors.yml .env.example +git commit -m "feat: add external connector sidecar docker setup" +``` + +--- + +### Task 6: Final Sidecar Verification + +**Files:** +- Review: `docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md` + +- [ ] **Step 1: Run all sidecar tests** + +Run: + +```bash +cd external-connector +uv run pytest -q +``` + +Expected: all sidecar tests pass. + +- [ ] **Step 2: Scan repo files for forbidden provider-runtime naming** + +Run: + +```bash +rg -n "[Oo]pen[Cc]law" external-connector docker-compose.external-connectors.yml .env.example docs/superpowers || true +``` + +Expected: no matches. + +- [ ] **Step 3: Verify Docker build** + +Run: + +```bash +docker compose -f docker-compose.external-connectors.yml build external-connector +``` + +Expected: build succeeds. + +- [ ] **Step 4: Commit verification-only fixes if needed** + +If verification required small fixes: + +```bash +git add external-connector docker-compose.external-connectors.yml .env.example +git commit -m "fix: stabilize external connector sidecar" +``` + +If no files changed, do not create an empty commit.