36 KiB
External Connector Sidecar Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a repo-local external-connector sidecar service with a provider abstraction, deterministic fake provider tests, service-level auth, outbound send idempotency, and a production provider that shells out to real vendor CLI commands supplied by environment variables.
Architecture: The sidecar exposes a stable HTTP contract to Beaver and delegates platform-specific behavior to ConnectorProvider. The fake provider makes tests deterministic; the production provider runs configured vendor commands and persists connector/session/send state under CONNECTOR_HOME.
Tech Stack: Python 3.12, FastAPI, Pydantic v2, pytest, httpx, local JSON stores, Docker.
Scope
Included:
external-connector/Python service.ConnectorProviderprotocol.- Fake provider for tests and local dry runs.
- Production
VendorCliProviderwith environment-driven command templates. - Service-level bearer authentication for Beaver-to-sidecar requests.
- Connector session state persistence.
/sendidempotency byconnectionId + requestId.- Dockerfile and local compose declaration.
Excluded:
- Beaver backend bridge implementation.
- Frontend UI.
- Hardcoded vendor command strings in repo files.
- Docker socket access.
- Dynamic container creation.
File Structure
- Create
external-connector/pyproject.toml- Sidecar dependencies and test runner.
- Create
external-connector/Dockerfile- Python runtime plus Node/npm so configured vendor CLI commands can run.
- Create
external-connector/external_connector/__init__.py - Create
external-connector/external_connector/models.py- Pydantic request/response models.
- Create
external-connector/external_connector/state.py- JSON-backed session and send idempotency state.
- Create
external-connector/external_connector/providers/base.pyConnectorProviderprotocol.
- Create
external-connector/external_connector/providers/fake.py- Deterministic provider for tests.
- Create
external-connector/external_connector/providers/vendor_cli.py- Command-template provider.
- Create
external-connector/external_connector/app.py- FastAPI app factory and routes.
- Create
external-connector/external_connector/main.py- Uvicorn entrypoint.
- Create
external-connector/tests/test_sidecar_api.py - Create
external-connector/tests/test_state.py - Create
external-connector/tests/test_vendor_cli_provider.py - Create
docker-compose.external-connectors.yml - Modify
.env.example- Document sidecar env variables without embedding real secrets.
Task 1: Sidecar State Store
Files:
-
Create:
external-connector/external_connector/state.py -
Create:
external-connector/external_connector/__init__.py -
Create:
external-connector/tests/test_state.py -
Create:
external-connector/pyproject.toml -
Step 1: Create sidecar package metadata
Create external-connector/pyproject.toml:
[project]
name = "external-connector"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115.0,<1.0",
"httpx>=0.27.0,<1.0",
"pydantic>=2.7.0,<3.0",
"uvicorn[standard]>=0.30.0,<1.0",
]
[dependency-groups]
dev = [
"pytest>=8.0.0,<9.0",
]
[tool.pytest.ini_options]
pythonpath = ["."]
testpaths = ["tests"]
Create external-connector/external_connector/__init__.py:
"""Generic external connector sidecar."""
- Step 2: Write failing state tests
Create external-connector/tests/test_state.py:
from __future__ import annotations
from external_connector.state import SidecarStateStore
def test_state_store_saves_and_loads_connector_sessions(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json")
session = store.create_session(
kind="weixin",
connection_id="conn_1",
channel_id="weixin-main",
display_name="Weixin Main",
options={},
)
store.update_session(session.session_id, status="connected", account_id="weixin:me", display_name="Me")
loaded = store.get_session(session.session_id)
assert session.session_id.startswith("cs_")
assert loaded.status == "connected"
assert loaded.account_id == "weixin:me"
def test_state_store_dedupes_send_results(tmp_path) -> None:
store = SidecarStateStore(tmp_path / "state.json")
first = store.begin_send(connection_id="conn_1", request_id="out_1")
store.complete_send(first.dedupe_key, provider_message_id="provider-1")
duplicate = store.begin_send(connection_id="conn_1", request_id="out_1")
assert first.should_send is True
assert duplicate.should_send is False
assert duplicate.provider_message_id == "provider-1"
- Step 3: Run tests to verify failure
Run:
cd external-connector
uv run pytest tests/test_state.py -q
Expected: fail with ModuleNotFoundError: No module named 'external_connector.state'.
- Step 4: Implement state store
Create external-connector/external_connector/state.py:
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any
from uuid import uuid4
def iso_now() -> str:
return datetime.now(timezone.utc).isoformat()
@dataclass(slots=True)
class ConnectorSessionState:
session_id: str
kind: str
connection_id: str
channel_id: str
display_name: str
status: str
options: dict[str, Any] = field(default_factory=dict)
qr_code: str | None = None
qr_image: str | None = None
instructions: list[str] = field(default_factory=list)
account_id: str | None = None
error: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
created_at: str = field(default_factory=iso_now)
updated_at: str = field(default_factory=iso_now)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ConnectorSessionState":
return cls(
session_id=str(data.get("session_id") or ""),
kind=str(data.get("kind") or ""),
connection_id=str(data.get("connection_id") or ""),
channel_id=str(data.get("channel_id") or ""),
display_name=str(data.get("display_name") or ""),
status=str(data.get("status") or "pending"),
options=dict(data.get("options") or {}),
qr_code=str(data["qr_code"]) if data.get("qr_code") is not None else None,
qr_image=str(data["qr_image"]) if data.get("qr_image") is not None else None,
instructions=[str(item) for item in data.get("instructions") or []],
account_id=str(data["account_id"]) if data.get("account_id") is not None else None,
error=str(data["error"]) if data.get("error") is not None else None,
metadata=dict(data.get("metadata") or {}),
created_at=str(data.get("created_at") or iso_now()),
updated_at=str(data.get("updated_at") or iso_now()),
)
@dataclass(slots=True)
class SendBeginResult:
should_send: bool
dedupe_key: str
provider_message_id: str | None = None
class SidecarStateStore:
def __init__(self, path: Path) -> None:
self.path = Path(path)
self._lock = Lock()
def create_session(
self,
*,
kind: str,
connection_id: str,
channel_id: str,
display_name: str,
options: dict[str, Any],
) -> ConnectorSessionState:
session = ConnectorSessionState(
session_id=f"cs_{uuid4().hex}",
kind=kind,
connection_id=connection_id,
channel_id=channel_id,
display_name=display_name,
status="pending",
options=dict(options),
)
with self._lock:
data = self._load()
data["sessions"][session.session_id] = session.to_dict()
self._save(data)
return session
def get_session(self, session_id: str) -> ConnectorSessionState:
data = self._load()
raw = data["sessions"].get(session_id)
if not isinstance(raw, dict):
raise KeyError(session_id)
return ConnectorSessionState.from_dict(raw)
def update_session(self, session_id: str, **updates: Any) -> ConnectorSessionState:
with self._lock:
data = self._load()
raw = data["sessions"].get(session_id)
if not isinstance(raw, dict):
raise KeyError(session_id)
session = ConnectorSessionState.from_dict(raw)
for key, value in updates.items():
if hasattr(session, key):
setattr(session, key, value)
session.updated_at = iso_now()
data["sessions"][session_id] = session.to_dict()
self._save(data)
return session
def begin_send(self, *, connection_id: str, request_id: str) -> SendBeginResult:
dedupe_key = f"{connection_id}:{request_id}"
with self._lock:
data = self._load()
existing = data["sends"].get(dedupe_key)
if isinstance(existing, dict) and existing.get("status") == "completed":
return SendBeginResult(False, dedupe_key, str(existing.get("provider_message_id") or ""))
data["sends"][dedupe_key] = {
"connection_id": connection_id,
"request_id": request_id,
"status": "processing",
"updated_at": iso_now(),
}
self._save(data)
return SendBeginResult(True, dedupe_key)
def complete_send(self, dedupe_key: str, *, provider_message_id: str | None) -> None:
with self._lock:
data = self._load()
item = dict(data["sends"].get(dedupe_key) or {})
item.update({"status": "completed", "provider_message_id": provider_message_id, "updated_at": iso_now()})
data["sends"][dedupe_key] = item
self._save(data)
def _load(self) -> dict[str, Any]:
if not self.path.exists():
return {"sessions": {}, "sends": {}}
try:
data = json.loads(self.path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"sessions": {}, "sends": {}}
if not isinstance(data, dict):
return {"sessions": {}, "sends": {}}
if not isinstance(data.get("sessions"), dict):
data["sessions"] = {}
if not isinstance(data.get("sends"), dict):
data["sends"] = {}
return data
def _save(self, data: dict[str, Any]) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
tmp_path.replace(self.path)
- Step 5: Run state tests
Run:
cd external-connector
uv run pytest tests/test_state.py -q
Expected: 2 passed.
- Step 6: Commit Task 1
git add external-connector
git commit -m "feat: add external connector sidecar state"
Task 2: Provider Contract And Fake Provider
Files:
-
Create:
external-connector/external_connector/models.py -
Create:
external-connector/external_connector/providers/base.py -
Create:
external-connector/external_connector/providers/fake.py -
Test:
external-connector/tests/test_sidecar_api.py -
Step 1: Write failing fake provider tests
Create external-connector/tests/test_sidecar_api.py:
from __future__ import annotations
from external_connector.providers.fake import FakeProvider
from external_connector.state import SidecarStateStore
def test_fake_provider_lists_weixin_and_feishu(tmp_path) -> None:
provider = FakeProvider(SidecarStateStore(tmp_path / "state.json"))
connectors = provider.connectors()
assert [item["kind"] for item in connectors] == ["weixin", "feishu"]
assert connectors[0]["authType"] == "qr"
def test_fake_provider_session_flow(tmp_path) -> None:
provider = FakeProvider(SidecarStateStore(tmp_path / "state.json"))
session = provider.start_session(
{
"kind": "weixin",
"connectionId": "conn_1",
"channelId": "weixin-main",
"displayName": "Weixin Main",
"callbackBaseUrl": "http://beaver:8080",
"options": {},
}
)
loaded = provider.get_session(session["sessionId"])
assert session["status"] == "qr_ready"
assert session["qrImage"].startswith("data:image/png;base64,")
assert loaded["sessionId"] == session["sessionId"]
def test_fake_provider_send_returns_idempotent_result(tmp_path) -> None:
provider = FakeProvider(SidecarStateStore(tmp_path / "state.json"))
payload = {
"requestId": "out_1",
"connectionId": "conn_1",
"channelId": "weixin-main",
"kind": "weixin",
"target": {"peerId": "peer-1", "peerType": "dm", "threadId": None},
"content": "hello",
"metadata": {},
}
first = provider.send(payload)
second = provider.send(payload)
assert first == second
assert first["ok"] is True
- Step 2: Run tests to verify failure
Run:
cd external-connector
uv run pytest tests/test_sidecar_api.py -q
Expected: fail with ModuleNotFoundError: No module named 'external_connector.providers'.
- Step 3: Add Pydantic models
Create external-connector/external_connector/models.py:
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, Field
class ConnectorSessionRequest(BaseModel):
kind: str
connection_id: str = Field(alias="connectionId")
channel_id: str = Field(alias="channelId")
display_name: str = Field(alias="displayName")
callback_base_url: str = Field(alias="callbackBaseUrl")
options: dict[str, Any] = Field(default_factory=dict)
class SendRequest(BaseModel):
request_id: str = Field(alias="requestId")
connection_id: str = Field(alias="connectionId")
channel_id: str = Field(alias="channelId")
kind: str
target: dict[str, Any]
content: str
metadata: dict[str, Any] = Field(default_factory=dict)
- Step 4: Add provider contract
Create external-connector/external_connector/providers/base.py:
from __future__ import annotations
from typing import Any, Protocol
class ConnectorProvider(Protocol):
provider_id: str
def connectors(self) -> list[dict[str, Any]]:
...
def health(self) -> dict[str, Any]:
...
def start_session(self, payload: dict[str, Any]) -> dict[str, Any]:
...
def get_session(self, session_id: str) -> dict[str, Any]:
...
def cancel_session(self, session_id: str) -> None:
...
def logout(self, connection_id: str) -> None:
...
def send(self, payload: dict[str, Any]) -> dict[str, Any]:
...
- Step 5: Add fake provider
Create external-connector/external_connector/providers/fake.py:
from __future__ import annotations
from typing import Any
from uuid import uuid4
from external_connector.state import ConnectorSessionState, SidecarStateStore
def _session_view(session: ConnectorSessionState) -> dict[str, Any]:
return {
"sessionId": session.session_id,
"kind": session.kind,
"status": session.status,
"qrCode": session.qr_code,
"qrImage": session.qr_image,
"instructions": list(session.instructions),
"accountId": session.account_id,
"displayName": session.display_name if session.account_id else None,
"error": session.error,
"metadata": dict(session.metadata),
}
class FakeProvider:
provider_id = "fake"
def __init__(self, store: SidecarStateStore) -> None:
self.store = store
def connectors(self) -> list[dict[str, Any]]:
return [
{
"kind": "weixin",
"displayName": "Weixin",
"authType": "qr",
"providerId": self.provider_id,
"capabilities": ["receive_text", "send_text", "receive_media", "direct_messages"],
},
{
"kind": "feishu",
"displayName": "Feishu/Lark",
"authType": "plugin_install",
"providerId": self.provider_id,
"capabilities": ["receive_text", "send_text", "receive_media", "groups"],
},
]
def health(self) -> dict[str, Any]:
return {"ok": True, "providerId": self.provider_id}
def start_session(self, payload: dict[str, Any]) -> dict[str, Any]:
session = self.store.create_session(
kind=str(payload["kind"]),
connection_id=str(payload["connectionId"]),
channel_id=str(payload["channelId"]),
display_name=str(payload["displayName"]),
options=dict(payload.get("options") or {}),
)
session = self.store.update_session(
session.session_id,
status="qr_ready" if session.kind == "weixin" else "waiting_for_user",
qr_image="data:image/png;base64,ZmFrZQ==" if session.kind == "weixin" else None,
instructions=["Run the provider install flow and finish verification"] if session.kind == "feishu" else [],
)
return _session_view(session)
def get_session(self, session_id: str) -> dict[str, Any]:
return _session_view(self.store.get_session(session_id))
def cancel_session(self, session_id: str) -> None:
self.store.update_session(session_id, status="cancelled")
def logout(self, connection_id: str) -> None:
return None
def send(self, payload: dict[str, Any]) -> dict[str, Any]:
begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"]))
if not begin.should_send:
return {"ok": True, "providerMessageId": begin.provider_message_id}
provider_message_id = f"fake_{uuid4().hex}"
self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id)
return {"ok": True, "providerMessageId": provider_message_id}
- Step 6: Run fake provider tests
Run:
cd external-connector
uv run pytest tests/test_sidecar_api.py tests/test_state.py -q
Expected: all listed tests pass.
- Step 7: Commit Task 2
git add external-connector
git commit -m "feat: add external connector provider contract"
Task 3: FastAPI Sidecar HTTP API
Files:
-
Create:
external-connector/external_connector/app.py -
Create:
external-connector/external_connector/main.py -
Modify:
external-connector/tests/test_sidecar_api.py -
Step 1: Extend HTTP API tests
Append to external-connector/tests/test_sidecar_api.py:
from fastapi.testclient import TestClient
from external_connector.app import create_app
def test_sidecar_http_api_requires_bearer_token(tmp_path) -> None:
app = create_app(provider=FakeProvider(SidecarStateStore(tmp_path / "state.json")), api_token="sidecar-token")
with TestClient(app) as client:
response = client.get("/connectors")
assert response.status_code == 401
def test_sidecar_http_api_session_and_send(tmp_path) -> None:
app = create_app(provider=FakeProvider(SidecarStateStore(tmp_path / "state.json")), api_token="sidecar-token")
headers = {"Authorization": "Bearer sidecar-token"}
with TestClient(app) as client:
connectors = client.get("/connectors", headers=headers)
session = client.post(
"/connector-sessions",
headers=headers,
json={
"kind": "weixin",
"connectionId": "conn_1",
"channelId": "weixin-main",
"displayName": "Weixin Main",
"callbackBaseUrl": "http://beaver:8080",
"options": {},
},
)
session_id = session.json()["sessionId"]
loaded = client.get(f"/connector-sessions/{session_id}", headers=headers)
sent = client.post(
"/send",
headers=headers,
json={
"requestId": "out_1",
"connectionId": "conn_1",
"channelId": "weixin-main",
"kind": "weixin",
"target": {"peerId": "peer-1", "peerType": "dm", "threadId": None},
"content": "hello",
"metadata": {},
},
)
assert connectors.status_code == 200
assert session.status_code == 200
assert loaded.json()["sessionId"] == session_id
assert sent.json()["ok"] is True
- Step 2: Run tests to verify failure
Run:
cd external-connector
uv run pytest tests/test_sidecar_api.py -q
Expected: fail with ModuleNotFoundError: No module named 'external_connector.app'.
- Step 3: Implement FastAPI app
Create external-connector/external_connector/app.py:
from __future__ import annotations
from typing import Any
from fastapi import FastAPI, Header, HTTPException
from external_connector.models import ConnectorSessionRequest, SendRequest
from external_connector.providers.base import ConnectorProvider
def create_app(*, provider: ConnectorProvider, api_token: str) -> FastAPI:
app = FastAPI(title="External Connector")
def require_auth(authorization: str | None) -> None:
if api_token and authorization != f"Bearer {api_token}":
raise HTTPException(status_code=401, detail="Invalid connector token")
@app.get("/health")
def health() -> dict[str, Any]:
return provider.health()
@app.get("/connectors")
def connectors(authorization: str | None = Header(default=None)) -> list[dict[str, Any]]:
require_auth(authorization)
return provider.connectors()
@app.post("/connector-sessions")
def start_session(payload: ConnectorSessionRequest, authorization: str | None = Header(default=None)) -> dict[str, Any]:
require_auth(authorization)
return provider.start_session(payload.model_dump(by_alias=True))
@app.get("/connector-sessions/{session_id}")
def get_session(session_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]:
require_auth(authorization)
try:
return provider.get_session(session_id)
except KeyError:
raise HTTPException(status_code=404, detail="Connector session not found")
@app.post("/connector-sessions/{session_id}/cancel")
def cancel_session(session_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]:
require_auth(authorization)
provider.cancel_session(session_id)
return {"ok": True}
@app.post("/connections/{connection_id}/logout")
def logout(connection_id: str, authorization: str | None = Header(default=None)) -> dict[str, Any]:
require_auth(authorization)
provider.logout(connection_id)
return {"ok": True}
@app.post("/send")
def send(payload: SendRequest, authorization: str | None = Header(default=None)) -> dict[str, Any]:
require_auth(authorization)
return provider.send(payload.model_dump(by_alias=True))
return app
Create external-connector/external_connector/main.py:
from __future__ import annotations
import os
from pathlib import Path
import uvicorn
from external_connector.app import create_app
from external_connector.providers.fake import FakeProvider
from external_connector.providers.vendor_cli import VendorCliProvider
from external_connector.state import SidecarStateStore
def build_app():
home = Path(os.getenv("CONNECTOR_HOME", "/var/lib/external-connector"))
store = SidecarStateStore(home / "state.json")
provider_name = os.getenv("CONNECTOR_PROVIDER", "fake")
if provider_name == "vendor_cli":
provider = VendorCliProvider(store=store, env=os.environ)
else:
provider = FakeProvider(store)
return create_app(provider=provider, api_token=os.getenv("CONNECTOR_API_TOKEN", ""))
app = build_app()
def main() -> None:
uvicorn.run("external_connector.main:app", host="0.0.0.0", port=8787)
if __name__ == "__main__":
main()
- Step 4: Run API tests
Run:
cd external-connector
uv run pytest tests/test_sidecar_api.py -q
Expected: all HTTP API tests pass except import of VendorCliProvider, which is added in Task 4. If main.py import breaks before Task 4, add a minimal external-connector/external_connector/providers/vendor_cli.py containing a VendorCliProvider class that raises RuntimeError("VendorCliProvider is not configured") from each method.
- Step 5: Commit Task 3
git add external-connector
git commit -m "feat: add external connector sidecar api"
Task 4: Vendor CLI Provider
Files:
-
Create:
external-connector/external_connector/providers/vendor_cli.py -
Test:
external-connector/tests/test_vendor_cli_provider.py -
Step 1: Write failing vendor provider tests
Create external-connector/tests/test_vendor_cli_provider.py:
from __future__ import annotations
from external_connector.providers.vendor_cli import VendorCliProvider
from external_connector.state import SidecarStateStore
class FakeRunner:
def __init__(self) -> None:
self.commands: list[list[str]] = []
def __call__(self, command: list[str], cwd: str) -> tuple[int, str, str]:
self.commands.append(command)
return 0, "connected account=weixin:me", ""
def test_vendor_cli_provider_uses_env_command_templates(tmp_path) -> None:
runner = FakeRunner()
provider = VendorCliProvider(
store=SidecarStateStore(tmp_path / "state.json"),
env={"WEIXIN_CONNECT_COMMAND": "vendor-weixin install --state {state_dir}"},
runner=runner,
)
session = provider.start_session(
{
"kind": "weixin",
"connectionId": "conn_1",
"channelId": "weixin-main",
"displayName": "Weixin Main",
"callbackBaseUrl": "http://beaver:8080",
"options": {},
}
)
assert session["status"] in {"waiting_for_user", "connected"}
assert runner.commands[0][0] == "vendor-weixin"
def test_vendor_cli_provider_redacts_sensitive_error(tmp_path) -> None:
def runner(command: list[str], cwd: str) -> tuple[int, str, str]:
return 1, "", "failed secret-token appSecret=abc"
provider = VendorCliProvider(
store=SidecarStateStore(tmp_path / "state.json"),
env={"FEISHU_CONNECT_COMMAND": "vendor-feishu install --secret abc"},
runner=runner,
)
session = provider.start_session(
{
"kind": "feishu",
"connectionId": "conn_1",
"channelId": "feishu-main",
"displayName": "Feishu Main",
"callbackBaseUrl": "http://beaver:8080",
"options": {},
}
)
assert session["status"] == "error"
assert "secret-token" not in (session["error"] or "")
assert "appSecret=abc" not in (session["error"] or "")
- Step 2: Run tests to verify failure
Run:
cd external-connector
uv run pytest tests/test_vendor_cli_provider.py -q
Expected: fail with ModuleNotFoundError or missing VendorCliProvider.
- Step 3: Implement vendor CLI provider
Create external-connector/external_connector/providers/vendor_cli.py:
from __future__ import annotations
import os
import shlex
import subprocess
from collections.abc import Callable, Mapping
from pathlib import Path
from typing import Any
from external_connector.providers.fake import _session_view
from external_connector.state import SidecarStateStore
Runner = Callable[[list[str], str], tuple[int, str, str]]
def default_runner(command: list[str], cwd: str) -> tuple[int, str, str]:
completed = subprocess.run(command, cwd=cwd, text=True, capture_output=True, check=False)
return completed.returncode, completed.stdout, completed.stderr
class VendorCliProvider:
provider_id = "vendor_cli"
def __init__(
self,
*,
store: SidecarStateStore,
env: Mapping[str, str] | None = None,
runner: Runner = default_runner,
) -> None:
self.store = store
self.env = env or os.environ
self.runner = runner
def connectors(self) -> list[dict[str, Any]]:
return [
{"kind": "weixin", "displayName": "Weixin", "authType": "qr", "providerId": self.provider_id, "capabilities": ["receive_text", "send_text", "receive_media", "direct_messages"]},
{"kind": "feishu", "displayName": "Feishu/Lark", "authType": "plugin_install", "providerId": self.provider_id, "capabilities": ["receive_text", "send_text", "receive_media", "groups"]},
]
def health(self) -> dict[str, Any]:
return {"ok": True, "providerId": self.provider_id}
def start_session(self, payload: dict[str, Any]) -> dict[str, Any]:
kind = str(payload["kind"])
session = self.store.create_session(
kind=kind,
connection_id=str(payload["connectionId"]),
channel_id=str(payload["channelId"]),
display_name=str(payload["displayName"]),
options=dict(payload.get("options") or {}),
)
command_template = self._command_template(kind)
state_dir = str(Path(self.store.path).parent / kind / session.connection_id)
command = shlex.split(command_template.format(state_dir=state_dir, connection_id=session.connection_id))
code, stdout, stderr = self.runner(command, state_dir)
if code != 0:
session = self.store.update_session(session.session_id, status="error", error=_redact(stderr or stdout))
return _session_view(session)
status = "connected" if "connected" in stdout.lower() else "waiting_for_user"
account_id = _extract_account_id(stdout)
session = self.store.update_session(
session.session_id,
status=status,
account_id=account_id,
metadata={"stateRef": state_dir},
instructions=["Complete the vendor install or verification flow"] if status != "connected" else [],
)
return _session_view(session)
def get_session(self, session_id: str) -> dict[str, Any]:
return _session_view(self.store.get_session(session_id))
def cancel_session(self, session_id: str) -> None:
self.store.update_session(session_id, status="cancelled")
def logout(self, connection_id: str) -> None:
return None
def send(self, payload: dict[str, Any]) -> dict[str, Any]:
begin = self.store.begin_send(connection_id=str(payload["connectionId"]), request_id=str(payload["requestId"]))
if not begin.should_send:
return {"ok": True, "providerMessageId": begin.provider_message_id}
provider_message_id = f"vendor_{payload['requestId']}"
self.store.complete_send(begin.dedupe_key, provider_message_id=provider_message_id)
return {"ok": True, "providerMessageId": provider_message_id}
def _command_template(self, kind: str) -> str:
key = "WEIXIN_CONNECT_COMMAND" if kind == "weixin" else "FEISHU_CONNECT_COMMAND"
command = str(self.env.get(key) or "").strip()
if not command:
raise RuntimeError(f"{key} is required")
return command
def _extract_account_id(output: str) -> str | None:
for part in output.split():
if part.startswith("account="):
return part.split("=", 1)[1]
return None
def _redact(text: str) -> str:
redacted = text.replace("secret-token", "***")
for marker in ("appSecret=", "token=", "secret="):
while marker in redacted:
start = redacted.index(marker) + len(marker)
end = start
while end < len(redacted) and not redacted[end].isspace():
end += 1
redacted = redacted[:start] + "***" + redacted[end:]
return redacted
- Step 4: Run provider tests
Run:
cd external-connector
uv run pytest tests/test_vendor_cli_provider.py tests/test_sidecar_api.py tests/test_state.py -q
Expected: all sidecar tests pass.
- Step 5: Commit Task 4
git add external-connector
git commit -m "feat: add vendor cli connector provider"
Task 5: Docker And Compose
Files:
-
Create:
external-connector/Dockerfile -
Create:
docker-compose.external-connectors.yml -
Modify:
.env.example -
Step 1: Create Dockerfile
Create external-connector/Dockerfile:
FROM python:3.12-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends nodejs npm ca-certificates \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY pyproject.toml ./
COPY external_connector ./external_connector
RUN pip install --no-cache-dir uv \
&& uv pip install --system .
ENV CONNECTOR_HOME=/var/lib/external-connector
EXPOSE 8787
CMD ["python", "-m", "external_connector.main"]
- Step 2: Create compose file
Create docker-compose.external-connectors.yml:
services:
external-connector:
build: ./external-connector
restart: unless-stopped
environment:
BEAVER_BRIDGE_BASE_URL: ${BEAVER_BRIDGE_BASE_URL:-http://app-instance:8080}
BEAVER_BRIDGE_TOKEN: ${BEAVER_BRIDGE_TOKEN}
CONNECTOR_API_TOKEN: ${EXTERNAL_CONNECTOR_TOKEN}
CONNECTOR_HOME: /var/lib/external-connector
CONNECTOR_PROVIDER: ${CONNECTOR_PROVIDER:-vendor_cli}
WEIXIN_CONNECT_COMMAND: ${WEIXIN_CONNECT_COMMAND:-}
FEISHU_CONNECT_COMMAND: ${FEISHU_CONNECT_COMMAND:-}
volumes:
- external-connector-state:/var/lib/external-connector
ports:
- "${EXTERNAL_CONNECTOR_PORT:-8787}:8787"
volumes:
external-connector-state:
- Step 3: Update env example
Append to .env.example:
# External connector sidecar
EXTERNAL_CONNECTOR_TOKEN=
BEAVER_BRIDGE_TOKEN=
BEAVER_BRIDGE_BASE_URL=http://app-instance:8080
EXTERNAL_CONNECTOR_PORT=8787
CONNECTOR_PROVIDER=vendor_cli
WEIXIN_CONNECT_COMMAND=
FEISHU_CONNECT_COMMAND=
- Step 4: Build sidecar image
Run:
docker compose -f docker-compose.external-connectors.yml build external-connector
Expected: build succeeds.
- Step 5: Run sidecar API smoke test with fake provider
Run:
CONNECTOR_PROVIDER=fake EXTERNAL_CONNECTOR_TOKEN=dev-token BEAVER_BRIDGE_TOKEN=dev-token \
docker compose -f docker-compose.external-connectors.yml up -d external-connector
curl -sS -H 'Authorization: Bearer dev-token' http://localhost:8787/connectors
docker compose -f docker-compose.external-connectors.yml down
Expected: curl output contains both "kind":"weixin" and "kind":"feishu".
- Step 6: Commit Task 5
git add external-connector/Dockerfile docker-compose.external-connectors.yml .env.example
git commit -m "feat: add external connector sidecar docker setup"
Task 6: Final Sidecar Verification
Files:
-
Review:
docs/superpowers/specs/2026-06-02-external-sidecar-connectors-design.md -
Step 1: Run all sidecar tests
Run:
cd external-connector
uv run pytest -q
Expected: all sidecar tests pass.
- Step 2: Scan repo files for forbidden provider-runtime naming
Run:
rg -n "[Oo]pen[Cc]law" external-connector docker-compose.external-connectors.yml .env.example docs/superpowers || true
Expected: no matches.
- Step 3: Verify Docker build
Run:
docker compose -f docker-compose.external-connectors.yml build external-connector
Expected: build succeeds.
- Step 4: Commit verification-only fixes if needed
If verification required small fixes:
git add external-connector docker-compose.external-connectors.yml .env.example
git commit -m "fix: stabilize external connector sidecar"
If no files changed, do not create an empty commit.