diff --git a/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md b/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md index 1e818f0..32aa02b 100644 --- a/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md +++ b/docs/superpowers/plans/2026-06-03-external-connector-backend-runtime.md @@ -315,257 +315,7 @@ git commit -m "feat: add connector bridge dedupe store" --- -### Task 2: External Connector Channel - -**Files:** -- Create: `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py` -- Create: `app-instance/backend/beaver/interfaces/channels/external_connector.py` -- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py` -- Test: `app-instance/backend/tests/unit/test_external_connector_channel.py` - -- [ ] **Step 1: Write failing channel tests** - -Create `app-instance/backend/tests/unit/test_external_connector_channel.py`: - -```python -from __future__ import annotations - -import asyncio - -from beaver.foundation.events import ChannelIdentity, OutboundMessage -from beaver.interfaces.channels.external_connector import ExternalConnectorChannel - - -class FakeSidecarClient: - def __init__(self) -> None: - self.sent: list[dict] = [] - - async def send(self, payload: dict) -> dict: - self.sent.append(payload) - return {"ok": True, "providerMessageId": "provider-1"} - - -def test_external_connector_channel_sends_with_target_and_request_id() -> None: - async def run() -> None: - client = FakeSidecarClient() - channel = ExternalConnectorChannel( - channel_id="weixin-main", - platform_kind="weixin", - connection_id="conn_1", - account_id="weixin:me", - display_name="Weixin Main", - sidecar_client=client, - ) - message = OutboundMessage( - channel="weixin-main", - content="reply", - session_id="s1", - finish_reason="stop", - message_id="out-msg-1", - channel_identity=ChannelIdentity( - channel_id="weixin-main", - kind="weixin", - account_id="weixin:me", - peer_id="peer-1", - peer_type="dm", - thread_id=None, - user_id="sender-1", - message_id="in-msg-1", - ), - ) - - await channel.send(message) - - assert client.sent == [ - { - "requestId": "out_out-msg-1", - "connectionId": "conn_1", - "channelId": "weixin-main", - "kind": "weixin", - "target": {"peerId": "peer-1", "peerType": "dm", "threadId": None}, - "content": "reply", - "metadata": {"inboundMessageId": "in-msg-1", "sessionId": "s1"}, - } - ] - - asyncio.run(run()) - - -def test_external_connector_channel_requires_identity() -> None: - async def run() -> None: - channel = ExternalConnectorChannel( - channel_id="weixin-main", - platform_kind="weixin", - connection_id="conn_1", - account_id="weixin:me", - display_name="Weixin Main", - sidecar_client=FakeSidecarClient(), - ) - message = OutboundMessage(channel="weixin-main", content="reply", session_id="s1", finish_reason="stop") - - try: - await channel.send(message) - except ValueError as exc: - assert "channel_identity is required" in str(exc) - else: - raise AssertionError("Expected ValueError") - - asyncio.run(run()) -``` - -- [ ] **Step 2: Run tests to verify failure** - -Run: - -```bash -cd app-instance/backend -uv run pytest tests/unit/test_external_connector_channel.py -q -``` - -Expected: fail with `ModuleNotFoundError: No module named 'beaver.interfaces.channels.external_connector'`. - -- [ ] **Step 3: Implement sidecar client** - -Create `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py`: - -```python -from __future__ import annotations - -from typing import Any - -import httpx - - -class ConnectorSidecarClient: - def __init__(self, *, base_url: str, token: str, timeout_seconds: float = 20.0) -> None: - self.base_url = base_url.rstrip("/") - self.token = token - self.timeout_seconds = float(timeout_seconds) - - async def get_connectors(self) -> list[dict[str, Any]]: - return await self._request("GET", "/connectors") - - async def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: - return await self._request("POST", "/connector-sessions", json=payload) - - async def get_session(self, session_id: str) -> dict[str, Any]: - return await self._request("GET", f"/connector-sessions/{session_id}") - - async def cancel_session(self, session_id: str) -> dict[str, Any]: - return await self._request("POST", f"/connector-sessions/{session_id}/cancel", json={}) - - async def logout(self, connection_id: str) -> dict[str, Any]: - return await self._request("POST", f"/connections/{connection_id}/logout", json={}) - - async def send(self, payload: dict[str, Any]) -> dict[str, Any]: - return await self._request("POST", "/send", json=payload) - - async def _request(self, method: str, path: str, *, json: dict[str, Any] | None = None) -> Any: - headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} - async with httpx.AsyncClient(timeout=self.timeout_seconds) as client: - response = await client.request(method, f"{self.base_url}{path}", json=json, headers=headers) - response.raise_for_status() - return response.json() -``` - -- [ ] **Step 4: Implement external channel** - -Create `app-instance/backend/beaver/interfaces/channels/external_connector.py`: - -```python -from __future__ import annotations - -from typing import Any - -from beaver.foundation.events import OutboundMessage -from beaver.interfaces.channels.connections.sidecar_client import ConnectorSidecarClient - - -class ExternalConnectorChannel: - def __init__( - self, - *, - channel_id: str, - platform_kind: str, - connection_id: str, - account_id: str, - display_name: str, - sidecar_client: ConnectorSidecarClient | Any, - ) -> None: - self.channel_id = channel_id - self.kind = "external_connector" - self.mode = "http" - self.platform_kind = platform_kind - self.connection_id = connection_id - self.account_id = account_id - self.display_name = display_name or channel_id - self.sidecar_client = sidecar_client - self.started = False - - async def start(self) -> None: - self.started = True - - async def stop(self) -> None: - self.started = False - - async def send(self, message: OutboundMessage) -> None: - identity = message.channel_identity - if identity is None: - raise ValueError("channel_identity is required for external connector sends") - payload = { - "requestId": _request_id(message), - "connectionId": self.connection_id, - "channelId": self.channel_id, - "kind": self.platform_kind, - "target": { - "peerId": identity.peer_id, - "peerType": identity.peer_type, - "threadId": identity.thread_id, - }, - "content": message.content, - "metadata": { - "inboundMessageId": identity.message_id, - "sessionId": message.session_id, - }, - } - await self.sidecar_client.send(payload) - - -def _request_id(message: OutboundMessage) -> str: - return f"out_{message.message_id}" -``` - -- [ ] **Step 5: Export channel symbol** - -Modify `app-instance/backend/beaver/interfaces/channels/__init__.py`: - -```python -from .external_connector import ExternalConnectorChannel -``` - -Add `ExternalConnectorChannel` to `__all__`. - -- [ ] **Step 6: Run channel tests** - -Run: - -```bash -cd app-instance/backend -uv run pytest tests/unit/test_external_connector_channel.py -q -``` - -Expected: `2 passed`. - -- [ ] **Step 7: Commit Task 2** - -```bash -git add app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py app-instance/backend/beaver/interfaces/channels/external_connector.py app-instance/backend/beaver/interfaces/channels/__init__.py app-instance/backend/tests/unit/test_external_connector_channel.py -git commit -m "feat: add external connector channel" -``` - ---- - -### Task 3: Dynamic Runtime Channels +### Task 2: Dynamic Runtime Channels **Files:** - Modify: `app-instance/backend/beaver/interfaces/channels/manager.py` @@ -716,7 +466,7 @@ Add methods to `ChannelRuntime`: if current == config and channel_id in self.adapters: return if not config.enabled: - await self.remove_channel(channel_id) + await self._remove_channel_locked(channel_id) self.channel_configs[channel_id] = config self.states[channel_id] = {"state": "disabled", "last_error": None} return @@ -733,16 +483,17 @@ Add methods to `ChannelRuntime`: async def remove_channel(self, channel_id: str) -> None: async with self._lifecycle_lock: - adapter = self.adapters.pop(channel_id, None) - self.manager.unregister(channel_id) - self.channel_configs.pop(channel_id, None) - if adapter is not None: - await adapter.stop() - self.events.record(channel_id=channel_id, kind="adapter_stopped") - self.states[channel_id] = {"state": "removed", "last_error": None} -``` + await self._remove_channel_locked(channel_id) -If this direct implementation deadlocks because `add_channel()` calls `remove_channel()` under the same lock, split the locked removal body into a private `_remove_channel_locked()` helper and call that from both public methods. + async def _remove_channel_locked(self, channel_id: str) -> None: + adapter = self.adapters.pop(channel_id, None) + self.manager.unregister(channel_id) + self.channel_configs.pop(channel_id, None) + if adapter is not None: + await adapter.stop() + self.events.record(channel_id=channel_id, kind="adapter_stopped") + self.states[channel_id] = {"state": "removed", "last_error": None} +``` - [ ] **Step 5: Run dynamic runtime tests** @@ -755,7 +506,7 @@ uv run pytest tests/unit/test_channel_runtime_dynamic_channels.py tests/unit/tes Expected: all listed tests pass. -- [ ] **Step 6: Commit Task 3** +- [ ] **Step 6: Commit Task 2** ```bash git add app-instance/backend/beaver/interfaces/channels/manager.py app-instance/backend/beaver/interfaces/channels/runtime.py app-instance/backend/tests/unit/test_channel_runtime_dynamic_channels.py @@ -764,6 +515,301 @@ git commit -m "feat: support dynamic runtime channels" --- +### Task 3: External Connector Channel + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py` +- Create: `app-instance/backend/beaver/interfaces/channels/external_connector.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py` +- Test: `app-instance/backend/tests/unit/test_external_connector_channel.py` + +- [ ] **Step 1: Write failing channel tests** + +Create `app-instance/backend/tests/unit/test_external_connector_channel.py`: + +```python +from __future__ import annotations + +import asyncio + +from beaver.foundation.events import ChannelIdentity, OutboundMessage +from beaver.interfaces.channels.external_connector import ExternalConnectorChannel, _request_id + + +class FakeSidecarClient: + def __init__(self) -> None: + self.sent: list[dict] = [] + + async def send(self, payload: dict) -> dict: + self.sent.append(payload) + return {"ok": True, "providerMessageId": "provider-1"} + + +def test_external_connector_channel_sends_with_target_and_request_id() -> None: + async def run() -> None: + client = FakeSidecarClient() + channel = ExternalConnectorChannel( + channel_id="weixin-main", + platform_kind="weixin", + connection_id="conn_1", + account_id="weixin:me", + display_name="Weixin Main", + sidecar_client=client, + ) + message = OutboundMessage( + channel="weixin-main", + content="reply", + session_id="s1", + finish_reason="stop", + message_id="out-msg-1", + channel_identity=ChannelIdentity( + channel_id="weixin-main", + kind="weixin", + account_id="weixin:me", + peer_id="peer-1", + peer_type="dm", + thread_id=None, + user_id="sender-1", + message_id="in-msg-1", + ), + ) + + await channel.send(message) + + assert client.sent == [ + { + "requestId": "out_weixin-main:s1:out-msg-1", + "connectionId": "conn_1", + "channelId": "weixin-main", + "kind": "weixin", + "target": {"peerId": "peer-1", "peerType": "dm", "threadId": None}, + "content": "reply", + "metadata": {"inboundMessageId": "in-msg-1", "sessionId": "s1"}, + } + ] + + asyncio.run(run()) + + +def test_external_connector_request_id_falls_back_when_message_id_is_none_or_blank() -> None: + identity = ChannelIdentity( + channel_id="weixin-main", + kind="weixin", + account_id="weixin:me", + peer_id="peer-1", + peer_type="dm", + message_id="in-msg-1", + ) + first = OutboundMessage( + channel="weixin-main", + content="same reply", + session_id="s1", + finish_reason="stop", + message_id=None, # type: ignore[arg-type] + channel_identity=identity, + ) + second = OutboundMessage( + channel="weixin-main", + content="same reply", + session_id="s1", + finish_reason="stop", + message_id="", + channel_identity=identity, + ) + + assert _request_id(first) == _request_id(second) + assert _request_id(first).startswith("out_weixin-main:s1:") + + +def test_external_connector_channel_requires_identity() -> None: + async def run() -> None: + channel = ExternalConnectorChannel( + channel_id="weixin-main", + platform_kind="weixin", + connection_id="conn_1", + account_id="weixin:me", + display_name="Weixin Main", + sidecar_client=FakeSidecarClient(), + ) + message = OutboundMessage(channel="weixin-main", content="reply", session_id="s1", finish_reason="stop") + + try: + await channel.send(message) + except ValueError as exc: + assert "channel_identity is required" in str(exc) + else: + raise AssertionError("Expected ValueError") + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_connector_channel.py -q +``` + +Expected: fail with `ModuleNotFoundError: No module named 'beaver.interfaces.channels.external_connector'`. + +- [ ] **Step 3: Implement sidecar client** + +Create `app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py`: + +```python +from __future__ import annotations + +import hashlib +from typing import Any + +import httpx + + +class ConnectorSidecarClient: + def __init__(self, *, base_url: str, token: str, timeout_seconds: float = 20.0) -> None: + self.base_url = base_url.rstrip("/") + self.token = token + self.timeout_seconds = float(timeout_seconds) + + async def get_connectors(self) -> list[dict[str, Any]]: + return await self._request("GET", "/connectors") + + async def start_session(self, payload: dict[str, Any]) -> dict[str, Any]: + return await self._request("POST", "/connector-sessions", json=payload) + + async def get_session(self, session_id: str) -> dict[str, Any]: + return await self._request("GET", f"/connector-sessions/{session_id}") + + async def cancel_session(self, session_id: str) -> dict[str, Any]: + return await self._request("POST", f"/connector-sessions/{session_id}/cancel", json={}) + + async def logout(self, connection_id: str) -> dict[str, Any]: + return await self._request("POST", f"/connections/{connection_id}/logout", json={}) + + async def send(self, payload: dict[str, Any]) -> dict[str, Any]: + return await self._request("POST", "/send", json=payload) + + async def _request(self, method: str, path: str, *, json: dict[str, Any] | None = None) -> Any: + headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} + async with httpx.AsyncClient(timeout=self.timeout_seconds) as client: + response = await client.request(method, f"{self.base_url}{path}", json=json, headers=headers) + response.raise_for_status() + return response.json() +``` + +- [ ] **Step 4: Implement external channel** + +Create `app-instance/backend/beaver/interfaces/channels/external_connector.py`: + +```python +from __future__ import annotations + +from typing import Any + +from beaver.foundation.events import OutboundMessage +from beaver.interfaces.channels.connections.sidecar_client import ConnectorSidecarClient + + +class ExternalConnectorChannel: + def __init__( + self, + *, + channel_id: str, + platform_kind: str, + connection_id: str, + account_id: str, + display_name: str, + sidecar_client: ConnectorSidecarClient | Any, + ) -> None: + self.channel_id = channel_id + self.kind = "external_connector" + self.mode = "http" + self.platform_kind = platform_kind + self.connection_id = connection_id + self.account_id = account_id + self.display_name = display_name or channel_id + self.sidecar_client = sidecar_client + self.started = False + + async def start(self) -> None: + self.started = True + + async def stop(self) -> None: + self.started = False + + async def send(self, message: OutboundMessage) -> None: + identity = message.channel_identity + if identity is None: + raise ValueError("channel_identity is required for external connector sends") + payload = { + "requestId": _request_id(message), + "connectionId": self.connection_id, + "channelId": self.channel_id, + "kind": self.platform_kind, + "target": { + "peerId": identity.peer_id, + "peerType": identity.peer_type, + "threadId": identity.thread_id, + }, + "content": message.content, + "metadata": { + "inboundMessageId": identity.message_id, + "sessionId": message.session_id, + }, + } + await self.sidecar_client.send(payload) + + +def _request_id(message: OutboundMessage) -> str: + identity = message.channel_identity + channel = message.channel or (identity.channel_id if identity else "unknown") + session_id = message.session_id or (identity.session_id() if identity else "unknown") + message_id = str(message.message_id or "").strip() + if not message_id: + basis = "|".join( + [ + message.content, + identity.message_id if identity and identity.message_id else "", + identity.peer_id if identity else "", + message.finish_reason, + ] + ) + message_id = hashlib.sha256(basis.encode("utf-8")).hexdigest()[:24] + return f"out_{channel}:{session_id}:{message_id}" +``` + +- [ ] **Step 5: Export channel symbol** + +Modify `app-instance/backend/beaver/interfaces/channels/__init__.py`: + +```python +from .external_connector import ExternalConnectorChannel +``` + +Add `ExternalConnectorChannel` to `__all__`. + +- [ ] **Step 6: Run channel tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_external_connector_channel.py -q +``` + +Expected: `2 passed`. + +- [ ] **Step 7: Commit Task 3** + +```bash +git add app-instance/backend/beaver/interfaces/channels/connections/sidecar_client.py app-instance/backend/beaver/interfaces/channels/external_connector.py app-instance/backend/beaver/interfaces/channels/__init__.py app-instance/backend/tests/unit/test_external_connector_channel.py +git commit -m "feat: add external connector channel" +``` + +--- + ### Task 4: Runtime Factory For External Connector Channel **Files:** diff --git a/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md b/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md index b33b5c3..354468f 100644 --- a/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md +++ b/docs/superpowers/plans/2026-06-03-external-connector-sidecar.md @@ -20,7 +20,7 @@ Included: - Production `VendorCliProvider` with environment-driven command templates. - Service-level bearer authentication for Beaver-to-sidecar requests. - Connector session state persistence. -- `/send` idempotency by `connectionId + requestId`. +- `/send` idempotency by `connectionId + requestId`, including processing TTL retry semantics. - Dockerfile and local compose declaration. Excluded: @@ -28,9 +28,20 @@ Excluded: - Beaver backend bridge implementation. - Frontend UI. - Hardcoded vendor command strings in repo files. +- Accepting command strings from frontend or sidecar HTTP request bodies. - Docker socket access. - Dynamic container creation. +## Vendor Command Safety Contract + +`VendorCliProvider` may execute vendor install/send commands because the sidecar is a controlled deployment container, but command execution has fixed boundaries: + +- Command templates only come from sidecar startup environment variables. +- No frontend or HTTP API payload can supply or override command strings. +- `cwd` is fixed to `CONNECTOR_HOME`; per-connection state paths are passed as formatted arguments only. +- Every command uses a hard timeout from `CONNECTOR_COMMAND_TIMEOUT_SECONDS`, defaulting to 120 seconds. +- stdout and stderr are redacted before being stored or returned. + ## File Structure - Create `external-connector/pyproject.toml` @@ -138,7 +149,31 @@ def test_state_store_dedupes_send_results(tmp_path) -> None: assert first.should_send is True assert duplicate.should_send is False + assert duplicate.status == "completed" + assert duplicate.http_status == 200 assert duplicate.provider_message_id == "provider-1" + + +def test_state_store_returns_conflict_for_active_send_processing(tmp_path) -> None: + store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=60) + + store.begin_send(connection_id="conn_1", request_id="out_1") + duplicate = store.begin_send(connection_id="conn_1", request_id="out_1") + + assert duplicate.should_send is False + assert duplicate.status == "processing" + assert duplicate.http_status == 409 + assert duplicate.retry_after_seconds == 5 + + +def test_state_store_retries_stale_send_processing(tmp_path) -> None: + store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=0) + + store.begin_send(connection_id="conn_1", request_id="out_1") + retry = store.begin_send(connection_id="conn_1", request_id="out_1") + + assert retry.should_send is True + assert retry.status == "processing" ``` - [ ] **Step 3: Run tests to verify failure** @@ -218,12 +253,16 @@ class ConnectorSessionState: class SendBeginResult: should_send: bool dedupe_key: str + status: str + http_status: int + retry_after_seconds: int | None = None provider_message_id: str | None = None class SidecarStateStore: - def __init__(self, path: Path) -> None: + def __init__(self, path: Path, *, send_processing_ttl_seconds: int = 60) -> None: self.path = Path(path) + self.send_processing_ttl_seconds = int(send_processing_ttl_seconds) self._lock = Lock() def create_session( @@ -277,8 +316,12 @@ class SidecarStateStore: with self._lock: data = self._load() existing = data["sends"].get(dedupe_key) - if isinstance(existing, dict) and existing.get("status") == "completed": - return SendBeginResult(False, dedupe_key, str(existing.get("provider_message_id") or "")) + if isinstance(existing, dict): + status = str(existing.get("status") or "processing") + if status == "completed": + return SendBeginResult(False, dedupe_key, "completed", 200, None, str(existing.get("provider_message_id") or "")) + if status == "processing" and not self._send_is_stale(existing): + return SendBeginResult(False, dedupe_key, "processing", 409, 5) data["sends"][dedupe_key] = { "connection_id": connection_id, "request_id": request_id, @@ -286,7 +329,7 @@ class SidecarStateStore: "updated_at": iso_now(), } self._save(data) - return SendBeginResult(True, dedupe_key) + return SendBeginResult(True, dedupe_key, "processing", 200) def complete_send(self, dedupe_key: str, *, provider_message_id: str | None) -> None: with self._lock: @@ -296,6 +339,11 @@ class SidecarStateStore: data["sends"][dedupe_key] = item self._save(data) + def _send_is_stale(self, item: dict[str, Any]) -> bool: + updated_at = str(item.get("updated_at") or iso_now()) + updated = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) + return (datetime.now(timezone.utc) - updated).total_seconds() >= self.send_processing_ttl_seconds + def _load(self) -> dict[str, Any]: if not self.path.exists(): return {"sessions": {}, "sends": {}} @@ -565,6 +613,8 @@ class FakeProvider: def send(self, payload: dict[str, Any]) -> dict[str, Any]: begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"])) if not begin.should_send: + if begin.http_status == 409: + return {"ok": False, "status": begin.status, "retryAfterSeconds": begin.retry_after_seconds, "httpStatus": 409} return {"ok": True, "providerMessageId": begin.provider_message_id} provider_message_id = f"fake_{uuid4().hex}" self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id) @@ -655,6 +705,31 @@ def test_sidecar_http_api_session_and_send(tmp_path) -> None: assert session.status_code == 200 assert loaded.json()["sessionId"] == session_id assert sent.json()["ok"] is True + + +def test_sidecar_http_api_returns_conflict_for_processing_send(tmp_path) -> None: + store = SidecarStateStore(tmp_path / "state.json", send_processing_ttl_seconds=60) + store.begin_send(connection_id="conn_1", request_id="out_1") + app = create_app(provider=FakeProvider(store), api_token="sidecar-token") + headers = {"Authorization": "Bearer sidecar-token"} + + with TestClient(app) as client: + response = client.post( + "/send", + headers=headers, + json={ + "requestId": "out_1", + "connectionId": "conn_1", + "channelId": "weixin-main", + "kind": "weixin", + "target": {"peerId": "peer-1", "peerType": "dm", "threadId": None}, + "content": "hello", + "metadata": {}, + }, + ) + + assert response.status_code == 409 + assert response.json()["retryAfterSeconds"] == 5 ``` - [ ] **Step 2: Run tests to verify failure** @@ -678,6 +753,7 @@ from __future__ import annotations from typing import Any from fastapi import FastAPI, Header, HTTPException +from fastapi.responses import JSONResponse from external_connector.models import ConnectorSessionRequest, SendRequest from external_connector.providers.base import ConnectorProvider @@ -725,9 +801,13 @@ def create_app(*, provider: ConnectorProvider, api_token: str) -> FastAPI: return {"ok": True} @app.post("/send") - def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> dict[str, Any]: + def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> JSONResponse | dict[str, Any]: require_auth(authorization) - return provider.send(payload.model_dump(by_alias=True)) + result = dict(provider.send(payload.model_dump(by_alias=True))) + status_code = int(result.pop("httpStatus", 200)) + if status_code != 200: + return JSONResponse(status_code=status_code, content=result) + return result return app ``` @@ -810,9 +890,13 @@ from external_connector.state import SidecarStateStore class FakeRunner: def __init__(self) -> None: self.commands: list[list[str]] = [] + self.cwd: str | None = None + self.timeout: float | None = None - def __call__(self, command: list[str], cwd: str) -> tuple[int, str, str]: + def __call__(self, command: list[str], cwd: str, timeout: float) -> tuple[int, str, str]: self.commands.append(command) + self.cwd = cwd + self.timeout = timeout return 0, "connected account=weixin:me", "" @@ -820,7 +904,7 @@ def test_vendor_cli_provider_uses_env_command_templates(tmp_path) -> None: runner = FakeRunner() provider = VendorCliProvider( store=SidecarStateStore(tmp_path / "state.json"), - env={"WEIXIN_CONNECT_COMMAND": "vendor-weixin install --state {state_dir}"}, + env={"WEIXIN_CONNECT_COMMAND": "vendor-weixin install --state {state_dir}", "CONNECTOR_COMMAND_TIMEOUT_SECONDS": "30"}, runner=runner, ) @@ -837,10 +921,12 @@ def test_vendor_cli_provider_uses_env_command_templates(tmp_path) -> None: assert session["status"] in {"waiting_for_user", "connected"} assert runner.commands[0][0] == "vendor-weixin" + assert runner.cwd == str(tmp_path) + assert runner.timeout == 30.0 def test_vendor_cli_provider_redacts_sensitive_error(tmp_path) -> None: - def runner(command: list[str], cwd: str) -> tuple[int, str, str]: + def runner(command: list[str], cwd: str, timeout: float) -> tuple[int, str, str]: return 1, "", "failed secret-token appSecret=abc" provider = VendorCliProvider( @@ -894,11 +980,11 @@ from external_connector.providers.fake import _session_view from external_connector.state import SidecarStateStore -Runner = Callable[[list[str], str], tuple[int, str, str]] +Runner = Callable[[list[str], str, float], tuple[int, str, str]] -def default_runner(command: list[str], cwd: str) -> tuple[int, str, str]: - completed = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False) +def default_runner(command: list[str], cwd: str, timeout: float) -> tuple[int, str, str]: + completed = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False, timeout=timeout) return completed.returncode, completed.stdout, completed.stderr @@ -915,6 +1001,7 @@ class VendorCliProvider: self.store = store self.env = env or os.environ self.runner = runner + self.command_timeout_seconds = float(self.env.get("CONNECTOR_COMMAND_TIMEOUT_SECONDS") or 120) def connectors(self) -> list[dict[str, Any]]: return [ @@ -935,9 +1022,18 @@ class VendorCliProvider: options=dict(payload.get("options") or {}), ) command_template = self._command_template(kind) - state_dir = str(Path(self.store.path).parent / kind / session.connection_id) + connector_home = Path(self.store.path).parent + state_dir = str(connector_home / kind / session.connection_id) + Path(state_dir).mkdir(parents=True, exist_ok=True) command = shlex.split(command_template.format(state_dir=state_dir, connection_id=session.connection_id)) - code, stdout, stderr = self.runner(command, state_dir) + try: + code, stdout, stderr = self.runner(command, str(connector_home), self.command_timeout_seconds) + except subprocess.TimeoutExpired: + session = self.store.update_session(session.session_id, status="error", error="Provider command timed out") + return _session_view(session) + except Exception as exc: + session = self.store.update_session(session.session_id, status="error", error=_redact(str(exc))) + return _session_view(session) if code != 0: session = self.store.update_session(session.session_id, status="error", error=_redact(stderr or stdout)) return _session_view(session) @@ -964,6 +1060,8 @@ class VendorCliProvider: def send(self, payload: dict[str, Any]) -> dict[str, Any]: begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"])) if not begin.should_send: + if begin.http_status == 409: + return {"ok": False, "status": begin.status, "retryAfterSeconds": begin.retry_after_seconds, "httpStatus": 409} return {"ok": True, "providerMessageId": begin.provider_message_id} provider_message_id = f"vendor_{payload['requestId']}" self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id) @@ -1061,6 +1159,7 @@ services: CONNECTOR_API_TOKEN: ${EXTERNAL_CONNECTOR_TOKEN} CONNECTOR_HOME: /var/lib/external-connector CONNECTOR_PROVIDER: ${CONNECTOR_PROVIDER:-vendor_cli} + CONNECTOR_COMMAND_TIMEOUT_SECONDS: ${CONNECTOR_COMMAND_TIMEOUT_SECONDS:-120} WEIXIN_CONNECT_COMMAND: ${WEIXIN_CONNECT_COMMAND:-} FEISHU_CONNECT_COMMAND: ${FEISHU_CONNECT_COMMAND:-} volumes: @@ -1083,6 +1182,7 @@ BEAVER_BRIDGE_TOKEN= BEAVER_BRIDGE_BASE_URL=http://app-instance:8080 EXTERNAL_CONNECTOR_PORT=8787 CONNECTOR_PROVIDER=vendor_cli +CONNECTOR_COMMAND_TIMEOUT_SECONDS=120 WEIXIN_CONNECT_COMMAND= FEISHU_CONNECT_COMMAND= ``` diff --git a/docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md b/docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md index b6c851e..317d9fb 100644 --- a/docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md +++ b/docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md @@ -99,6 +99,15 @@ Initial provider: - `VendorCliProvider`: runs the real CLI/plugin commands required by the current Weixin and Feishu/Lark vendor flows. +`VendorCliProvider` command execution is intentionally constrained: + +- Command templates are read only from sidecar startup environment variables. +- Frontend requests and sidecar HTTP request bodies cannot provide command strings. +- Command working directory is fixed to `CONNECTOR_HOME`. +- Per-connection state paths may be passed to commands as formatted arguments. +- Every command has a hard timeout. +- stdout and stderr are redacted before storage or API responses. + Future providers can be added without changing Beaver runtime code: - `WechatyProvider` @@ -232,6 +241,7 @@ services: CONNECTOR_API_TOKEN: ${EXTERNAL_CONNECTOR_TOKEN} CONNECTOR_HOME: /var/lib/external-connector CONNECTOR_PROVIDER: vendor_cli + CONNECTOR_COMMAND_TIMEOUT_SECONDS: 120 volumes: - external-connector-state:/var/lib/external-connector ``` @@ -347,7 +357,17 @@ Allowed connector session statuses: } ``` -`requestId` is required. Beaver must generate a stable request id for each outbound delivery attempt from the outbound message identity, and must reuse the same `requestId` if the same outbound delivery is retried. The sidecar dedupes `connectionId + requestId`; duplicate requests return the original send result and must not send a second platform message. +`requestId` is required. Beaver must generate a stable request id for each outbound delivery attempt and must reuse the same `requestId` if the same outbound delivery is retried. The first-version rule is: + +```text +out_{channel}:{session_id}:{message_id or sha256(content + inbound_message_id + peer_id + finish_reason)} +``` + +The sidecar dedupes `connectionId + requestId`: + +- `completed`: return the original send result and do not send a second platform message. +- `processing` updated less than 60 seconds ago: return `409 Conflict` with `{"retryAfterSeconds": 5}` so Beaver retries later. +- `processing` updated 60 seconds or more ago: treat as stale and retry the provider send. ## Beaver Bridge API @@ -497,7 +517,8 @@ The old `/api/channels` static config editor may remain for advanced runtime con - Duplicate completed bridge event: return idempotent success and do not call runtime again. - Duplicate in-flight bridge event: return `409 Conflict` until the 60-second processing TTL expires, then allow one reprocess. - Outbound send failure: mark outbound delivery failed and record connector error. -- Duplicate outbound send `requestId`: sidecar returns the original send result and does not send a second platform message. +- Duplicate completed outbound send `requestId`: sidecar returns the original send result and does not send a second platform message. +- Duplicate in-flight outbound send `requestId`: sidecar returns `409 Conflict` until the 60-second processing TTL expires, then allows one retry. - Sidecar restart: persisted provider state should survive through sidecar volume. ## Security @@ -507,6 +528,7 @@ The old `/api/channels` static config editor may remain for advanced runtime con - Sidecar can only call bridge endpoints with the service-level bridge token. - Beaver can only call sidecar control and send endpoints with the service-level connector token. - Sidecar state volume contains login state and must be treated as sensitive. +- Vendor command strings are deployment configuration, not user input. - Feishu user-identity mode has stronger privacy risk than bot-identity mode; UI must label it clearly if exposed. ## Testing @@ -533,6 +555,7 @@ Sidecar tests: - fake provider status transitions - provider command runner error redaction - send idempotency for duplicate `connectionId + requestId` +- send `processing` TTL returns `409 Conflict` before stale retry Frontend tests: