Files
beaver_project/docs/superpowers/plans/2026-06-02-channel-connectors-foundation.md

1768 lines
63 KiB
Markdown

# Channel Connectors Foundation Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Build the first connector slice: durable channel connections, pairing/credential primitives, connector registry, Telegram token connector, and backend APIs that materialize a runtime channel without manual JSON editing.
**Architecture:** Add a connector layer under `beaver/interfaces/channels/connections/` while keeping `ChannelRuntime`, `MessageBus`, and existing adapters as the message path. A `ChannelConnectionStore` persists setup state, a small credential vault stores secrets by reference, and `ChannelConnectorRegistry` materializes enabled connections into `ChannelConfig` objects during app startup. The first concrete connector is Telegram because token validation and runtime materialization are simple and testable with fake clients.
**Tech Stack:** Python dataclasses, FastAPI, Pydantic v2, local JSON stores, pytest, existing Beaver channel runtime.
---
## Scope
This plan implements phase 1 of `docs/superpowers/specs/2026-06-02-channel-connectors-and-pairing-design.md`.
Included:
- `ChannelConnection` data model and persistent JSON store.
- Restricted local credential store with secret redaction.
- One-time pairing token store, used now by tests and future terminal/QR connectors. It is implemented in this phase but not exposed through APIs; future terminal and QR connectors will consume it.
- Connector protocol and registry.
- Telegram connector with fake-client test hooks.
- Connection control APIs.
- App startup materialization from connections into `ChannelRuntime`.
Excluded from this plan:
- Terminal authenticated pairing.
- Feishu/Lark official SDK integration.
- Weixin external connector process.
- QQBot connector.
- Frontend connection wizard.
- Hot starting/stopping adapters without backend restart.
- Multi-process-safe storage. The JSON stores use `threading.Lock` plus atomic file replace for the single backend process used in phase 1. Production multi-worker deployment needs a file lock or database-backed store.
- Credential garbage collection. Updating secrets writes a new credential reference and leaves the old reference in the local credential file until a later cleanup pass.
## File Structure
- Create `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`
- Exports connection models, stores, connector registry, and Telegram connector.
- Create `app-instance/backend/beaver/interfaces/channels/connections/models.py`
- Dataclasses and constants for `ChannelConnection`, `PairingSession`, `ChannelRuntimeSpec`, `ValidationResult`.
- Create `app-instance/backend/beaver/interfaces/channels/connections/store.py`
- JSON-backed `ChannelConnectionStore`, `CredentialStore`, and `PairingTokenStore`.
- Create `app-instance/backend/beaver/interfaces/channels/connections/connectors.py`
- `ChannelConnector` protocol and `ChannelConnectorRegistry`.
- Create `app-instance/backend/beaver/interfaces/channels/connections/telegram.py`
- Telegram token connector that validates via injected client factory and materializes a runtime spec.
- Modify `app-instance/backend/beaver/interfaces/web/schemas/chat.py`
- Add Pydantic request/response models for connection APIs.
- Modify `app-instance/backend/beaver/interfaces/web/schemas/__init__.py`
- Export the new schemas.
- Modify `app-instance/backend/beaver/interfaces/web/app.py`
- Instantiate connection stores/registry, expose `/api/channel-connectors` and `/api/channel-connections` APIs, and merge materialized connection configs into runtime startup.
- Create `app-instance/backend/tests/unit/test_channel_connection_store.py`
- Store, credential redaction, and pairing token tests.
- Create `app-instance/backend/tests/unit/test_channel_connector_registry.py`
- Registry dispatch and runtime materialization tests.
- Create `app-instance/backend/tests/unit/test_telegram_channel_connector.py`
- Telegram validation/materialization tests with fake client.
- Create `app-instance/backend/tests/unit/test_channel_connection_api.py`
- FastAPI endpoint tests with fake service/app context.
---
### Task 1: Connection Models And Store
**Files:**
- Create: `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`
- Create: `app-instance/backend/beaver/interfaces/channels/connections/models.py`
- Create: `app-instance/backend/beaver/interfaces/channels/connections/store.py`
- Test: `app-instance/backend/tests/unit/test_channel_connection_store.py`
- [ ] **Step 1: Write failing store tests**
Create `app-instance/backend/tests/unit/test_channel_connection_store.py`:
```python
from __future__ import annotations
from beaver.interfaces.channels.connections import (
ChannelConnectionStore,
CredentialStore,
PairingTokenStore,
)
def test_channel_connection_store_creates_updates_lists_and_revokes(tmp_path) -> None:
store = ChannelConnectionStore(tmp_path / "connections.json")
created = store.create(
kind="telegram",
mode="polling",
display_name="Telegram Main",
account_id="telegram:bot-main",
owner_user_id="user-1",
auth_type="token",
runtime_config={"max_message_chars": 4096},
capabilities=["receive_text", "send_text"],
)
updated = store.update_status(created.connection_id, status="connected", last_error=None)
revoked = store.revoke(created.connection_id)
assert created.connection_id
assert created.channel_id.startswith("telegram-")
assert created.status == "draft"
assert updated.status == "connected"
assert revoked.status == "revoked"
assert store.get(created.connection_id).status == "revoked"
assert [item.connection_id for item in store.list()] == [created.connection_id]
def test_credential_store_saves_values_by_reference_and_redacts_views(tmp_path) -> None:
store = CredentialStore(tmp_path / "credentials.json")
ref = store.put(kind="telegram", values={"botToken": "secret-token", "empty": ""})
assert ref.startswith("cred_")
assert store.get(ref) == {"botToken": "secret-token"}
assert store.redacted(ref) == {"botToken": "***"}
def test_pairing_token_store_uses_one_time_expiring_tokens(tmp_path) -> None:
store = PairingTokenStore(tmp_path / "pairing.json")
session = store.create(kind="terminal", ttl_seconds=60, scope="channel:pair")
consumed = store.consume(session.token, expected_kind="terminal")
reused = store.consume(session.token, expected_kind="terminal")
assert session.status == "pending"
assert consumed is not None
assert consumed.status == "consumed"
assert reused is None
def test_pairing_token_store_rejects_expired_tokens(tmp_path) -> None:
store = PairingTokenStore(tmp_path / "pairing.json")
session = store.create(kind="weixin", ttl_seconds=-1, scope="channel:pair")
assert store.consume(session.token, expected_kind="weixin") is None
```
- [ ] **Step 2: Run tests to verify they fail**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connection_store.py -q
```
Expected: fail during import with `ModuleNotFoundError: No module named 'beaver.interfaces.channels.connections'`.
- [ ] **Step 3: Implement connection dataclasses**
Create `app-instance/backend/beaver/interfaces/channels/connections/models.py`:
```python
"""Channel connection setup models."""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Any
CONNECTION_STATUSES = {"draft", "pairing", "connected", "running", "degraded", "error", "revoked"}
def iso_now() -> str:
return datetime.now(timezone.utc).isoformat()
@dataclass(slots=True)
class ChannelConnection:
connection_id: str
owner_user_id: str | None
channel_id: str
kind: str
mode: str
display_name: str
account_id: str
status: str
auth_type: str
credentials_ref: str | None = None
connector_ref: str | None = None
pairing_session_id: str | None = None
runtime_config: dict[str, Any] = field(default_factory=dict)
capabilities: list[str] = field(default_factory=list)
created_at: str = field(default_factory=iso_now)
updated_at: str = field(default_factory=iso_now)
last_seen_at: str | None = None
last_error: str | None = None
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ChannelConnection":
return cls(
connection_id=str(data.get("connection_id") or ""),
owner_user_id=_optional_string(data.get("owner_user_id")),
channel_id=str(data.get("channel_id") or ""),
kind=str(data.get("kind") or ""),
mode=str(data.get("mode") or ""),
display_name=str(data.get("display_name") or ""),
account_id=str(data.get("account_id") or ""),
status=str(data.get("status") or "draft"),
auth_type=str(data.get("auth_type") or ""),
credentials_ref=_optional_string(data.get("credentials_ref")),
connector_ref=_optional_string(data.get("connector_ref")),
pairing_session_id=_optional_string(data.get("pairing_session_id")),
runtime_config=dict(data.get("runtime_config") or {}),
capabilities=[str(item) for item in data.get("capabilities") or []],
created_at=str(data.get("created_at") or iso_now()),
updated_at=str(data.get("updated_at") or iso_now()),
last_seen_at=_optional_string(data.get("last_seen_at")),
last_error=_optional_string(data.get("last_error")),
)
@dataclass(slots=True)
class PairingSession:
pairing_session_id: str
kind: str
scope: str
token: str
status: str
expires_at_ms: int
created_at_ms: int
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "PairingSession":
return cls(
pairing_session_id=str(data.get("pairing_session_id") or ""),
kind=str(data.get("kind") or ""),
scope=str(data.get("scope") or ""),
token=str(data.get("token") or ""),
status=str(data.get("status") or "pending"),
expires_at_ms=int(data.get("expires_at_ms") or 0),
created_at_ms=int(data.get("created_at_ms") or 0),
)
@dataclass(slots=True)
class ChannelRuntimeSpec:
channel_id: str
kind: str
mode: str
account_id: str
display_name: str
config: dict[str, Any] = field(default_factory=dict)
secrets_ref: str | None = None
external_endpoint: str | None = None
@dataclass(slots=True)
class ValidationResult:
ok: bool
status: str
account_id: str | None = None
display_name: str | None = None
error: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def _optional_string(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
```
- [ ] **Step 4: Implement JSON stores**
Create `app-instance/backend/beaver/interfaces/channels/connections/store.py`:
```python
"""Persistent channel connection stores."""
from __future__ import annotations
import json
import time
from pathlib import Path
from threading import Lock
from typing import Any
from uuid import uuid4
from .models import CONNECTION_STATUSES, ChannelConnection, PairingSession, iso_now
class ChannelConnectionStore:
def __init__(self, path: Path) -> None:
self.path = Path(path)
self._lock = Lock()
def create(
self,
*,
kind: str,
mode: str,
display_name: str,
account_id: str,
owner_user_id: str | None,
auth_type: str,
runtime_config: dict[str, Any] | None = None,
capabilities: list[str] | None = None,
credentials_ref: str | None = None,
) -> ChannelConnection:
with self._lock:
data = self._load()
connection_id = f"conn_{uuid4().hex}"
channel_id = f"{_slug(kind)}-{uuid4().hex[:8]}"
now = iso_now()
connection = ChannelConnection(
connection_id=connection_id,
owner_user_id=owner_user_id,
channel_id=channel_id,
kind=kind,
mode=mode,
display_name=display_name or channel_id,
account_id=account_id,
status="draft",
auth_type=auth_type,
credentials_ref=credentials_ref,
runtime_config=runtime_config or {},
capabilities=capabilities or [],
created_at=now,
updated_at=now,
)
data["connections"][connection_id] = connection.to_dict()
self._save(data)
return connection
def get(self, connection_id: str) -> ChannelConnection:
data = self._load()
raw = data["connections"].get(connection_id)
if not isinstance(raw, dict):
raise KeyError(connection_id)
return ChannelConnection.from_dict(raw)
def list(self) -> list[ChannelConnection]:
data = self._load()
return [ChannelConnection.from_dict(item) for item in data["connections"].values() if isinstance(item, dict)]
def update(self, connection: ChannelConnection) -> ChannelConnection:
with self._lock:
data = self._load()
if connection.connection_id not in data["connections"]:
raise KeyError(connection.connection_id)
connection.updated_at = iso_now()
data["connections"][connection.connection_id] = connection.to_dict()
self._save(data)
return connection
def update_status(self, connection_id: str, *, status: str, last_error: str | None) -> ChannelConnection:
if status not in CONNECTION_STATUSES:
raise ValueError(f"Unsupported connection status: {status}")
connection = self.get(connection_id)
connection.status = status
connection.last_error = last_error
if status in {"connected", "running"}:
connection.last_seen_at = iso_now()
return self.update(connection)
def revoke(self, connection_id: str) -> ChannelConnection:
return self.update_status(connection_id, status="revoked", last_error=None)
def _load(self) -> dict[str, Any]:
if not self.path.exists():
return {"connections": {}}
try:
data = json.loads(self.path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"connections": {}}
if not isinstance(data, dict) or not isinstance(data.get("connections"), dict):
return {"connections": {}}
return data
def _save(self, data: dict[str, Any]) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
tmp_path.replace(self.path)
class CredentialStore:
def __init__(self, path: Path) -> None:
self.path = Path(path)
self._lock = Lock()
def put(self, *, kind: str, values: dict[str, Any]) -> str:
cleaned = {str(key): str(value) for key, value in values.items() if str(key).strip() and str(value).strip()}
ref = f"cred_{uuid4().hex}"
with self._lock:
data = self._load()
data["credentials"][ref] = {"kind": kind, "values": cleaned, "created_at": iso_now()}
self._save(data)
return ref
def get(self, ref: str) -> dict[str, str]:
data = self._load()
item = data["credentials"].get(ref)
if not isinstance(item, dict):
raise KeyError(ref)
values = item.get("values")
if not isinstance(values, dict):
return {}
return {str(key): str(value) for key, value in values.items()}
def redacted(self, ref: str | None) -> dict[str, str]:
if not ref:
return {}
try:
values = self.get(ref)
except KeyError:
return {}
return {key: "***" for key in values}
def _load(self) -> dict[str, Any]:
if not self.path.exists():
return {"credentials": {}}
try:
data = json.loads(self.path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"credentials": {}}
if not isinstance(data, dict) or not isinstance(data.get("credentials"), dict):
return {"credentials": {}}
return data
def _save(self, data: dict[str, Any]) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
tmp_path.replace(self.path)
class PairingTokenStore:
def __init__(self, path: Path) -> None:
self.path = Path(path)
self._lock = Lock()
def create(self, *, kind: str, ttl_seconds: int, scope: str) -> PairingSession:
now_ms = _now_ms()
session = PairingSession(
pairing_session_id=f"pair_{uuid4().hex}",
kind=kind,
scope=scope,
token=f"pair_{uuid4().hex}",
status="pending",
expires_at_ms=now_ms + int(ttl_seconds * 1000),
created_at_ms=now_ms,
)
with self._lock:
data = self._load()
data["sessions"][session.pairing_session_id] = session.to_dict()
self._save(data)
return session
def consume(self, token: str, *, expected_kind: str) -> PairingSession | None:
with self._lock:
data = self._load()
for key, raw in data["sessions"].items():
session = PairingSession.from_dict(raw)
if session.token != token or session.kind != expected_kind:
continue
if session.status != "pending" or session.expires_at_ms <= _now_ms():
return None
session.status = "consumed"
data["sessions"][key] = session.to_dict()
self._save(data)
return session
return None
def _load(self) -> dict[str, Any]:
if not self.path.exists():
return {"sessions": {}}
try:
data = json.loads(self.path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"sessions": {}}
if not isinstance(data, dict) or not isinstance(data.get("sessions"), dict):
return {"sessions": {}}
return data
def _save(self, data: dict[str, Any]) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
tmp_path.replace(self.path)
def _now_ms() -> int:
return int(time.time() * 1000)
def _slug(value: str) -> str:
text = "".join(char if char.isalnum() else "-" for char in str(value).strip().lower())
return "-".join(part for part in text.split("-") if part) or "channel"
```
- [ ] **Step 5: Export the connection package**
Create `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`:
```python
"""Channel connection setup layer."""
from .models import ChannelConnection, ChannelRuntimeSpec, PairingSession, ValidationResult
from .store import ChannelConnectionStore, CredentialStore, PairingTokenStore
__all__ = [
"ChannelConnection",
"ChannelRuntimeSpec",
"PairingSession",
"ValidationResult",
"ChannelConnectionStore",
"CredentialStore",
"PairingTokenStore",
]
```
- [ ] **Step 6: Run store tests to verify they pass**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connection_store.py -q
```
Expected: `4 passed`.
- [ ] **Step 7: Commit Task 1**
```bash
git add app-instance/backend/beaver/interfaces/channels/connections app-instance/backend/tests/unit/test_channel_connection_store.py
git commit -m "feat: add channel connection store"
```
---
### Task 2: Connector Registry
**Files:**
- Create: `app-instance/backend/beaver/interfaces/channels/connections/connectors.py`
- Modify: `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`
- Test: `app-instance/backend/tests/unit/test_channel_connector_registry.py`
- [ ] **Step 1: Write failing registry tests**
Create `app-instance/backend/tests/unit/test_channel_connector_registry.py`:
```python
from __future__ import annotations
import asyncio
from beaver.interfaces.channels.connections import (
ChannelConnectionStore,
ChannelConnectorRegistry,
ChannelRuntimeSpec,
CredentialStore,
ValidationResult,
)
class FakeConnector:
kind = "fake"
def __init__(self) -> None:
self.validated: list[str] = []
self.revoked: list[str] = []
async def validate(self, connection_id: str) -> ValidationResult:
self.validated.append(connection_id)
return ValidationResult(ok=True, status="connected", account_id="fake-account", display_name="Fake")
async def materialize_runtime(self, connection_id: str) -> ChannelRuntimeSpec:
return ChannelRuntimeSpec(
channel_id="fake-channel",
kind="fake",
mode="webhook",
account_id="fake-account",
display_name="Fake",
config={"enabled": True},
)
async def revoke(self, connection_id: str) -> None:
self.revoked.append(connection_id)
return None
def test_connector_registry_dispatches_by_kind(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
connector = FakeConnector()
registry = ChannelConnectorRegistry(connection_store=connection_store, credential_store=credential_store)
registry.register(connector)
connection = connection_store.create(
kind="fake",
mode="webhook",
display_name="Fake",
account_id="fake-account",
owner_user_id=None,
auth_type="token",
)
result = await registry.validate(connection.connection_id)
spec = await registry.materialize_runtime(connection.connection_id)
assert result.ok is True
assert connector.validated == [connection.connection_id]
assert spec.channel_id == "fake-channel"
asyncio.run(run())
def test_connector_registry_materializes_only_connected_connections(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
registry = ChannelConnectorRegistry(connection_store=connection_store, credential_store=credential_store)
registry.register(FakeConnector())
draft = connection_store.create(
kind="fake",
mode="webhook",
display_name="Draft",
account_id="draft",
owner_user_id=None,
auth_type="token",
)
connected = connection_store.create(
kind="fake",
mode="webhook",
display_name="Connected",
account_id="connected",
owner_user_id=None,
auth_type="token",
)
connection_store.update_status(connected.connection_id, status="connected", last_error=None)
specs = await registry.materialize_connected_runtime_specs()
assert [spec.channel_id for spec in specs] == ["fake-channel"]
assert connection_store.get(draft.connection_id).status == "draft"
asyncio.run(run())
def test_connector_registry_revoke_calls_connector_and_updates_store(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
connector = FakeConnector()
registry = ChannelConnectorRegistry(connection_store=connection_store, credential_store=credential_store)
registry.register(connector)
connection = connection_store.create(
kind="fake",
mode="webhook",
display_name="Fake",
account_id="fake-account",
owner_user_id=None,
auth_type="token",
)
connection_store.update_status(connection.connection_id, status="connected", last_error=None)
await registry.revoke(connection.connection_id)
assert connector.revoked == [connection.connection_id]
assert connection_store.get(connection.connection_id).status == "revoked"
asyncio.run(run())
```
- [ ] **Step 2: Run tests to verify they fail**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connector_registry.py -q
```
Expected: fail with `ImportError: cannot import name 'ChannelConnectorRegistry'`.
- [ ] **Step 3: Implement connector protocol and registry**
Create `app-instance/backend/beaver/interfaces/channels/connections/connectors.py`:
```python
"""Channel connector registry."""
from __future__ import annotations
from typing import Protocol
from .models import ChannelRuntimeSpec, ValidationResult
from .store import ChannelConnectionStore, CredentialStore
class ChannelConnector(Protocol):
kind: str
async def validate(self, connection_id: str) -> ValidationResult:
...
async def materialize_runtime(self, connection_id: str) -> ChannelRuntimeSpec:
...
async def revoke(self, connection_id: str) -> None:
...
class ChannelConnectorRegistry:
def __init__(self, *, connection_store: ChannelConnectionStore, credential_store: CredentialStore) -> None:
self.connection_store = connection_store
self.credential_store = credential_store
self._connectors: dict[str, ChannelConnector] = {}
def register(self, connector: ChannelConnector) -> None:
kind = connector.kind.strip()
if not kind:
raise ValueError("Connector kind is required")
if kind in self._connectors:
raise ValueError(f"Connector already registered: {kind}")
self._connectors[kind] = connector
def connectors(self) -> list[dict[str, str]]:
return [{"kind": kind} for kind in sorted(self._connectors)]
async def validate(self, connection_id: str) -> ValidationResult:
connection = self.connection_store.get(connection_id)
connector = self._connector(connection.kind)
result = await connector.validate(connection_id)
self.connection_store.update_status(
connection_id,
status=result.status,
last_error=result.error,
)
return result
async def materialize_runtime(self, connection_id: str) -> ChannelRuntimeSpec:
connection = self.connection_store.get(connection_id)
return await self._connector(connection.kind).materialize_runtime(connection_id)
async def materialize_connected_runtime_specs(self) -> list[ChannelRuntimeSpec]:
specs: list[ChannelRuntimeSpec] = []
for connection in self.connection_store.list():
if connection.status not in {"connected", "running"}:
continue
specs.append(await self._connector(connection.kind).materialize_runtime(connection.connection_id))
return specs
async def revoke(self, connection_id: str) -> None:
connection = self.connection_store.get(connection_id)
await self._connector(connection.kind).revoke(connection_id)
self.connection_store.revoke(connection_id)
def _connector(self, kind: str) -> ChannelConnector:
connector = self._connectors.get(kind)
if connector is None:
raise KeyError(f"Connector not registered: {kind}")
return connector
```
- [ ] **Step 4: Export registry symbols**
Modify `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`:
```python
"""Channel connection setup layer."""
from .connectors import ChannelConnector, ChannelConnectorRegistry
from .models import ChannelConnection, ChannelRuntimeSpec, PairingSession, ValidationResult
from .store import ChannelConnectionStore, CredentialStore, PairingTokenStore
__all__ = [
"ChannelConnector",
"ChannelConnectorRegistry",
"ChannelConnection",
"ChannelRuntimeSpec",
"PairingSession",
"ValidationResult",
"ChannelConnectionStore",
"CredentialStore",
"PairingTokenStore",
]
```
- [ ] **Step 5: Run registry tests**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connector_registry.py -q
```
Expected: `3 passed`.
- [ ] **Step 6: Commit Task 2**
```bash
git add app-instance/backend/beaver/interfaces/channels/connections app-instance/backend/tests/unit/test_channel_connector_registry.py
git commit -m "feat: add channel connector registry"
```
---
### Task 3: Telegram Connector
**Files:**
- Create: `app-instance/backend/beaver/interfaces/channels/connections/telegram.py`
- Modify: `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`
- Test: `app-instance/backend/tests/unit/test_telegram_channel_connector.py`
- [ ] **Step 1: Write failing Telegram connector tests**
Create `app-instance/backend/tests/unit/test_telegram_channel_connector.py`:
```python
from __future__ import annotations
import asyncio
from beaver.interfaces.channels.connections import (
ChannelConnectionStore,
CredentialStore,
TelegramConnector,
)
class FakeTelegramClient:
async def get_me(self):
return {"id": 12345, "username": "beaver_bot", "first_name": "Beaver"}
class BrokenTelegramClient:
async def get_me(self):
raise RuntimeError("invalid token")
def test_telegram_connector_validates_token_and_updates_connection(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
credentials_ref = credential_store.put(kind="telegram", values={"botToken": "token-1"})
connection = connection_store.create(
kind="telegram",
mode="polling",
display_name="Telegram Main",
account_id="",
owner_user_id="user-1",
auth_type="token",
credentials_ref=credentials_ref,
runtime_config={"max_message_chars": 4096},
)
connector = TelegramConnector(
connection_store=connection_store,
credential_store=credential_store,
client_factory=lambda token: FakeTelegramClient(),
)
result = await connector.validate(connection.connection_id)
updated = connection_store.get(connection.connection_id)
assert result.ok is True
assert result.status == "connected"
assert result.account_id == "telegram:12345"
assert updated.account_id == "telegram:12345"
assert updated.display_name == "Beaver (@beaver_bot)"
assert updated.capabilities == ["receive_text", "send_text", "receive_media", "groups"]
asyncio.run(run())
def test_telegram_connector_materializes_runtime_spec(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
credentials_ref = credential_store.put(kind="telegram", values={"botToken": "token-1"})
connection = connection_store.create(
kind="telegram",
mode="polling",
display_name="Telegram Main",
account_id="telegram:12345",
owner_user_id=None,
auth_type="token",
credentials_ref=credentials_ref,
runtime_config={"max_message_chars": 4096, "require_mention_in_groups": True},
)
connection_store.update_status(connection.connection_id, status="connected", last_error=None)
connector = TelegramConnector(
connection_store=connection_store,
credential_store=credential_store,
client_factory=lambda token: FakeTelegramClient(),
)
spec = await connector.materialize_runtime(connection.connection_id)
assert spec.channel_id == connection.channel_id
assert spec.kind == "telegram"
assert spec.mode == "polling"
assert spec.account_id == "telegram:12345"
assert spec.config["max_message_chars"] == 4096
assert spec.config["require_mention_in_groups"] is True
assert spec.secrets_ref == credentials_ref
asyncio.run(run())
def test_telegram_connector_validation_failure_sets_error_status(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
credentials_ref = credential_store.put(kind="telegram", values={"botToken": "bad-token"})
connection = connection_store.create(
kind="telegram",
mode="polling",
display_name="Telegram Main",
account_id="",
owner_user_id=None,
auth_type="token",
credentials_ref=credentials_ref,
)
connector = TelegramConnector(
connection_store=connection_store,
credential_store=credential_store,
client_factory=lambda token: BrokenTelegramClient(),
)
result = await connector.validate(connection.connection_id)
assert result.ok is False
assert result.status == "error"
assert "invalid token" in (result.error or "")
asyncio.run(run())
def test_telegram_connector_revoke_leaves_store_status_to_registry(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
connection = connection_store.create(
kind="telegram",
mode="polling",
display_name="Telegram Main",
account_id="telegram:12345",
owner_user_id=None,
auth_type="token",
)
connection_store.update_status(connection.connection_id, status="connected", last_error=None)
connector = TelegramConnector(
connection_store=connection_store,
credential_store=credential_store,
client_factory=lambda token: FakeTelegramClient(),
)
await connector.revoke(connection.connection_id)
assert connection_store.get(connection.connection_id).status == "connected"
asyncio.run(run())
```
- [ ] **Step 2: Run tests to verify they fail**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_connector.py -q
```
Expected: fail with `ImportError: cannot import name 'TelegramConnector'`.
- [ ] **Step 3: Verify Telegram dependency**
Run:
```bash
cd app-instance/backend
rg -n "python-telegram-bot" pyproject.toml uv.lock | sed -n '1,20p'
```
Expected output includes `python-telegram-bot>=22.0,<23.0`. The default client factory may use `from telegram import Bot`, and `Bot.get_me()` is awaitable in this dependency line.
- [ ] **Step 4: Implement TelegramConnector**
Create `app-instance/backend/beaver/interfaces/channels/connections/telegram.py`:
```python
"""Telegram channel connector."""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
from .models import ChannelRuntimeSpec, ValidationResult
from .store import ChannelConnectionStore, CredentialStore
class TelegramConnector:
kind = "telegram"
def __init__(
self,
*,
connection_store: ChannelConnectionStore,
credential_store: CredentialStore,
client_factory: Callable[[str], Any] | None = None,
) -> None:
self.connection_store = connection_store
self.credential_store = credential_store
self.client_factory = client_factory or _default_client_factory
async def validate(self, connection_id: str) -> ValidationResult:
connection = self.connection_store.get(connection_id)
token = self._bot_token(connection.credentials_ref)
try:
client = self.client_factory(token)
raw = await client.get_me()
bot_id = _value(raw, "id")
username = _value(raw, "username")
first_name = _value(raw, "first_name") or "Telegram Bot"
account_id = f"telegram:{bot_id}" if bot_id else connection.account_id
display_name = f"{first_name} (@{username})" if username else first_name
connection.account_id = account_id
connection.display_name = display_name
connection.capabilities = ["receive_text", "send_text", "receive_media", "groups"]
self.connection_store.update(connection)
return ValidationResult(
ok=True,
status="connected",
account_id=account_id,
display_name=display_name,
metadata={"username": username} if username else {},
)
except Exception as exc:
return ValidationResult(ok=False, status="error", error=str(exc))
async def materialize_runtime(self, connection_id: str) -> ChannelRuntimeSpec:
connection = self.connection_store.get(connection_id)
if connection.status not in {"connected", "running"}:
raise ValueError(f"Connection is not connected: {connection.connection_id}")
return ChannelRuntimeSpec(
channel_id=connection.channel_id,
kind=connection.kind,
mode=connection.mode,
account_id=connection.account_id,
display_name=connection.display_name,
config=dict(connection.runtime_config),
secrets_ref=connection.credentials_ref,
)
async def revoke(self, connection_id: str) -> None:
# Telegram bot tokens do not have a Beaver-managed platform revoke action.
# The registry owns local connection state transitions.
return None
def _bot_token(self, credentials_ref: str | None) -> str:
if not credentials_ref:
raise ValueError("Telegram credentials are missing")
token = self.credential_store.get(credentials_ref).get("botToken")
if not token:
raise ValueError("botToken is required")
return token
def _value(raw: Any, key: str) -> str:
if isinstance(raw, dict):
value = raw.get(key)
else:
value = getattr(raw, key, None)
return str(value).strip() if value is not None else ""
def _default_client_factory(token: str) -> Any:
try:
from telegram import Bot
except ImportError as exc: # pragma: no cover - optional live dependency
raise RuntimeError("Install beaver-backend[telegram] to validate Telegram connections") from exc
return Bot(token=token)
```
- [ ] **Step 5: Export TelegramConnector**
Modify `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`:
```python
"""Channel connection setup layer."""
from .connectors import ChannelConnector, ChannelConnectorRegistry
from .models import ChannelConnection, ChannelRuntimeSpec, PairingSession, ValidationResult
from .store import ChannelConnectionStore, CredentialStore, PairingTokenStore
from .telegram import TelegramConnector
__all__ = [
"ChannelConnector",
"ChannelConnectorRegistry",
"ChannelConnection",
"ChannelRuntimeSpec",
"PairingSession",
"ValidationResult",
"ChannelConnectionStore",
"CredentialStore",
"PairingTokenStore",
"TelegramConnector",
]
```
- [ ] **Step 6: Run Telegram connector tests**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_connector.py -q
```
Expected: `4 passed`.
- [ ] **Step 7: Commit Task 3**
```bash
git add app-instance/backend/beaver/interfaces/channels/connections app-instance/backend/tests/unit/test_telegram_channel_connector.py
git commit -m "feat: add telegram channel connector"
```
---
### Task 4: Runtime Materialization From Connections
**Files:**
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
- Test: `app-instance/backend/tests/unit/test_channel_connector_registry.py`
- [ ] **Step 1: Verify ChannelConfig fields**
Run:
```bash
cd app-instance/backend
uv run python - <<'PY'
from dataclasses import fields
from beaver.foundation.config.schema import ChannelConfig
print([field.name for field in fields(ChannelConfig)])
PY
```
Expected output includes `enabled`, `kind`, `mode`, `account_id`, `display_name`, `config`, and `secrets`.
- [ ] **Step 2: Extend registry tests for ChannelConfig materialization**
Append to `app-instance/backend/tests/unit/test_channel_connector_registry.py`:
```python
from beaver.foundation.config.schema import ChannelConfig
def test_connector_registry_materializes_channel_configs_with_credentials(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
credentials_ref = credential_store.put(kind="telegram", values={"botToken": "token-1"})
connection = connection_store.create(
kind="fake",
mode="webhook",
display_name="Connected",
account_id="connected",
owner_user_id=None,
auth_type="token",
credentials_ref=credentials_ref,
)
connection_store.update_status(connection.connection_id, status="connected", last_error=None)
class CredentialAwareConnector(FakeConnector):
async def materialize_runtime(self, connection_id: str) -> ChannelRuntimeSpec:
stored = connection_store.get(connection_id)
return ChannelRuntimeSpec(
channel_id="fake-channel",
kind="fake",
mode="webhook",
account_id="fake-account",
display_name="Fake",
config={"enabled": True},
secrets_ref=stored.credentials_ref,
)
registry = ChannelConnectorRegistry(connection_store=connection_store, credential_store=credential_store)
registry.register(CredentialAwareConnector())
configs = await registry.materialize_channel_configs()
assert isinstance(configs["fake-channel"], ChannelConfig)
assert configs["fake-channel"].enabled is True
assert configs["fake-channel"].secrets == {"botToken": "token-1"}
asyncio.run(run())
```
- [ ] **Step 3: Run registry tests to verify failure**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connector_registry.py::test_connector_registry_materializes_channel_configs_with_credentials -q
```
Expected: fail with `AttributeError: 'ChannelConnectorRegistry' object has no attribute 'materialize_channel_configs'`.
- [ ] **Step 4: Implement channel config materialization**
Modify `app-instance/backend/beaver/interfaces/channels/connections/connectors.py`:
```python
from beaver.foundation.config.schema import ChannelConfig
```
Add this method to `ChannelConnectorRegistry`:
```python
async def materialize_channel_configs(self) -> dict[str, ChannelConfig]:
channels: dict[str, ChannelConfig] = {}
for spec in await self.materialize_connected_runtime_specs():
secrets = self.credential_store.get(spec.secrets_ref) if spec.secrets_ref else {}
channels[spec.channel_id] = ChannelConfig(
enabled=True,
kind=spec.kind,
mode=spec.mode,
account_id=spec.account_id,
display_name=spec.display_name,
config=dict(spec.config),
secrets=secrets,
)
return channels
```
- [ ] **Step 5: Add app helpers for connection state paths and registry construction**
Modify `app-instance/backend/beaver/interfaces/web/app.py` imports:
```python
from beaver.interfaces.channels.connections import (
ChannelConnectionStore,
ChannelConnectorRegistry,
CredentialStore,
TelegramConnector,
)
```
Add helper functions near `get_channel_runtime()`:
```python
def _connection_state_dir(workspace: Path) -> Path:
return Path(workspace) / "state" / "channel_connections"
def _build_channel_connector_registry(workspace: Path) -> ChannelConnectorRegistry:
state_dir = _connection_state_dir(workspace)
connection_store = ChannelConnectionStore(state_dir / "connections.json")
credential_store = CredentialStore(state_dir / "credentials.json")
registry = ChannelConnectorRegistry(connection_store=connection_store, credential_store=credential_store)
registry.register(
TelegramConnector(
connection_store=connection_store,
credential_store=credential_store,
)
)
return registry
```
- [ ] **Step 6: Merge materialized connections into runtime startup**
Modify the lifespan block in `app-instance/backend/beaver/interfaces/web/app.py` where `ChannelRuntime` is created:
```python
loaded = attached_service.create_loop().boot()
app.state.channel_connection_workspace = loaded.workspace
connector_registry = _build_channel_connector_registry(loaded.workspace)
app.state.channel_connector_registry = connector_registry
connection_channels = await connector_registry.materialize_channel_configs()
runtime_channels = dict(loaded.config.channels)
runtime_channels.update(connection_channels)
channel_runtime = ChannelRuntime(
service=attached_service,
workspace=loaded.workspace,
channels=runtime_channels,
)
```
Keep `app.state.channel_connector_registry = connector_registry` before runtime startup so API handlers can reuse the same stores.
- [ ] **Step 7: Run registry tests**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connector_registry.py -q
```
Expected: all tests pass.
- [ ] **Step 8: Commit Task 4**
```bash
git add app-instance/backend/beaver/interfaces/channels/connections/connectors.py app-instance/backend/beaver/interfaces/web/app.py app-instance/backend/tests/unit/test_channel_connector_registry.py
git commit -m "feat: materialize channel connections into runtime config"
```
---
### Task 5: Connection Control API
**Files:**
- Modify: `app-instance/backend/beaver/interfaces/web/schemas/chat.py`
- Modify: `app-instance/backend/beaver/interfaces/web/schemas/__init__.py`
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
- Test: `app-instance/backend/tests/unit/test_channel_connection_api.py`
- [ ] **Step 1: Write failing API tests**
Create `app-instance/backend/tests/unit/test_channel_connection_api.py`:
```python
from __future__ import annotations
from fastapi.testclient import TestClient
from beaver.interfaces.web.app import create_app
from beaver.services.agent_service import AgentService
def test_channel_connection_api_creates_updates_lists_and_revokes(tmp_path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text('{"agents": {"defaults": {"workspace": "%s"}}, "providers": {}}' % str(tmp_path), encoding="utf-8")
service = AgentService(config_path=config_path)
app = create_app(service=service, manage_service_lifecycle=False)
try:
with TestClient(app) as client:
created = client.post(
"/api/channel-connections",
json={
"kind": "telegram",
"mode": "polling",
"displayName": "Telegram Main",
"authType": "token",
"secrets": {"botToken": "token-1"},
"config": {"maxMessageChars": 4096, "requireMentionInGroups": True},
},
)
assert created.status_code == 200
body = created.json()
connection_id = body["connection"]["connection_id"]
assert body["connection"]["kind"] == "telegram"
assert body["connection"]["status"] == "draft"
assert "credentials_ref" not in body["connection"]
assert body["connection"]["runtime_config"] == {
"max_message_chars": 4096,
"require_mention_in_groups": True,
}
assert body["credentials"] == {"botToken": "***"}
patched = client.patch(
f"/api/channel-connections/{connection_id}",
json={
"displayName": "Telegram Ops",
"config": {"maxMessageChars": 2048},
"secrets": {"botToken": "token-2"},
},
)
assert patched.status_code == 200
assert patched.json()["connection"]["display_name"] == "Telegram Ops"
assert patched.json()["connection"]["runtime_config"] == {"max_message_chars": 2048}
assert patched.json()["credentials"] == {"botToken": "***"}
listed = client.get("/api/channel-connections")
assert listed.status_code == 200
assert listed.json()[0]["connection_id"] == connection_id
assert "credentials_ref" not in listed.json()[0]
revoked = client.post(f"/api/channel-connections/{connection_id}/revoke")
assert revoked.status_code == 200
assert revoked.json()["connection"]["status"] == "revoked"
finally:
service.close()
def test_channel_connectors_api_lists_registered_connectors(tmp_path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text('{"agents": {"defaults": {"workspace": "%s"}}, "providers": {}}' % str(tmp_path), encoding="utf-8")
service = AgentService(config_path=config_path)
app = create_app(service=service, manage_service_lifecycle=False)
try:
with TestClient(app) as client:
response = client.get("/api/channel-connectors")
finally:
service.close()
assert response.status_code == 200
assert response.json() == [{"kind": "telegram"}]
```
- [ ] **Step 2: Run API tests to verify failure**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connection_api.py -q
```
Expected: fail with `404 Not Found` for `/api/channel-connections`.
- [ ] **Step 3: Add web schemas**
Append to `app-instance/backend/beaver/interfaces/web/schemas/chat.py` after `WebChannelConfigResponse`:
```python
class WebChannelConnectionCreateRequest(BaseModel):
"""Create a channel connection from the setup UI."""
kind: str
mode: str
display_name: str | None = Field(default=None, alias="displayName")
owner_user_id: str | None = Field(default=None, alias="ownerUserId")
auth_type: str = Field(default="token", alias="authType")
account_id: str | None = Field(default=None, alias="accountId")
config: dict[str, Any] = Field(default_factory=dict)
secrets: dict[str, str | None] = Field(default_factory=dict)
class WebChannelConnectionResponse(BaseModel):
"""Channel connection response with redacted credentials."""
connection: dict[str, Any]
credentials: dict[str, str] = Field(default_factory=dict)
class WebChannelConnectionUpdateRequest(BaseModel):
"""Update editable channel connection setup fields."""
display_name: str | None = Field(default=None, alias="displayName")
account_id: str | None = Field(default=None, alias="accountId")
config: dict[str, Any] | None = None
secrets: dict[str, str | None] | None = None
class WebChannelValidationResponse(BaseModel):
"""Connector validation response."""
ok: bool
status: str
account_id: str | None = None
display_name: str | None = None
error: str | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
connection: dict[str, Any]
```
- [ ] **Step 4: Export web schemas**
Modify `app-instance/backend/beaver/interfaces/web/schemas/__init__.py` imports and `__all__` to include:
```python
WebChannelConnectionCreateRequest,
WebChannelConnectionResponse,
WebChannelConnectionUpdateRequest,
WebChannelValidationResponse,
```
- [ ] **Step 5: Add connector registry accessors to app.py**
Modify imports in `app-instance/backend/beaver/interfaces/web/app.py`:
```python
from beaver.interfaces.web.schemas import (
WebChannelConnectionCreateRequest,
WebChannelConnectionResponse,
WebChannelConnectionUpdateRequest,
WebChannelValidationResponse,
)
```
Add helper:
```python
def get_channel_connector_registry(request: Request) -> ChannelConnectorRegistry:
registry = getattr(request.app.state, "channel_connector_registry", None)
if not isinstance(registry, ChannelConnectorRegistry):
workspace = getattr(request.app.state, "channel_connection_workspace", None)
if workspace is None:
raise RuntimeError("Channel connector registry unavailable before service boot")
registry = _build_channel_connector_registry(workspace)
request.app.state.channel_connector_registry = registry
return registry
def _normalize_connection_config(config: dict[str, Any] | None) -> dict[str, Any]:
if not isinstance(config, dict):
return {}
return {
_camel_to_snake_text(str(key)): value
for key, value in config.items()
if str(key).strip()
}
def _camel_to_snake_text(value: str) -> str:
result: list[str] = []
for char in value:
if char.isupper() and result:
result.append("_")
result.append(char.lower())
return "".join(result)
def _connection_response_view(connection: Any) -> dict[str, Any]:
view = connection.to_dict()
view.pop("credentials_ref", None)
view.pop("connector_ref", None)
view.pop("pairing_session_id", None)
return view
```
- [ ] **Step 6: Add connection API routes**
Add routes near existing `/api/channels` routes in `app-instance/backend/beaver/interfaces/web/app.py`:
```python
@app.get("/api/channel-connectors")
async def list_channel_connectors(request: Request) -> list[dict[str, str]]:
return get_channel_connector_registry(request).connectors()
@app.get("/api/channel-connections")
async def list_channel_connections(request: Request) -> list[dict[str, Any]]:
registry = get_channel_connector_registry(request)
return [_connection_response_view(connection) for connection in registry.connection_store.list()]
@app.post("/api/channel-connections", response_model=WebChannelConnectionResponse)
async def create_channel_connection(
request: Request,
payload: WebChannelConnectionCreateRequest,
) -> WebChannelConnectionResponse:
registry = get_channel_connector_registry(request)
kind = _clean_text(payload.kind)
mode = _clean_text(payload.mode)
if not kind:
raise HTTPException(status_code=400, detail="Connection kind is required")
if not mode:
raise HTTPException(status_code=400, detail="Connection mode is required")
secrets = {key: value for key, value in payload.secrets.items() if value}
credentials_ref = registry.credential_store.put(kind=kind, values=secrets) if secrets else None
connection = registry.connection_store.create(
kind=kind,
mode=mode,
display_name=_clean_text(payload.display_name) or kind,
account_id=_clean_text(payload.account_id) or "",
owner_user_id=_clean_text(payload.owner_user_id) or None,
auth_type=_clean_text(payload.auth_type) or "token",
credentials_ref=credentials_ref,
runtime_config=_normalize_connection_config(payload.config),
)
return WebChannelConnectionResponse(
connection=_connection_response_view(connection),
credentials=registry.credential_store.redacted(credentials_ref),
)
@app.patch("/api/channel-connections/{connection_id}", response_model=WebChannelConnectionResponse)
async def update_channel_connection(
connection_id: str,
request: Request,
payload: WebChannelConnectionUpdateRequest,
) -> WebChannelConnectionResponse:
registry = get_channel_connector_registry(request)
try:
connection = registry.connection_store.get(connection_id)
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
if payload.display_name is not None:
connection.display_name = _clean_text(payload.display_name) or connection.display_name
if payload.account_id is not None:
connection.account_id = _clean_text(payload.account_id) or connection.account_id
if payload.config is not None:
connection.runtime_config = _normalize_connection_config(payload.config)
if payload.secrets:
secrets = {key: value for key, value in payload.secrets.items() if value}
if secrets:
# TODO: add credential GC when connection updates credentials.
connection.credentials_ref = registry.credential_store.put(kind=connection.kind, values=secrets)
connection = registry.connection_store.update(connection)
return WebChannelConnectionResponse(
connection=_connection_response_view(connection),
credentials=registry.credential_store.redacted(connection.credentials_ref),
)
@app.get("/api/channel-connections/{connection_id}", response_model=WebChannelConnectionResponse)
async def get_channel_connection(connection_id: str, request: Request) -> WebChannelConnectionResponse:
registry = get_channel_connector_registry(request)
try:
connection = registry.connection_store.get(connection_id)
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
return WebChannelConnectionResponse(
connection=_connection_response_view(connection),
credentials=registry.credential_store.redacted(connection.credentials_ref),
)
@app.post("/api/channel-connections/{connection_id}/validate", response_model=WebChannelValidationResponse)
async def validate_channel_connection(connection_id: str, request: Request) -> WebChannelValidationResponse:
registry = get_channel_connector_registry(request)
try:
result = await registry.validate(connection_id)
connection = registry.connection_store.get(connection_id)
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
return WebChannelValidationResponse(
ok=result.ok,
status=result.status,
account_id=result.account_id,
display_name=result.display_name,
error=result.error,
metadata=result.metadata,
connection=_connection_response_view(connection),
)
@app.post("/api/channel-connections/{connection_id}/revoke", response_model=WebChannelConnectionResponse)
async def revoke_channel_connection(connection_id: str, request: Request) -> WebChannelConnectionResponse:
registry = get_channel_connector_registry(request)
try:
await registry.revoke(connection_id)
connection = registry.connection_store.get(connection_id)
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
return WebChannelConnectionResponse(connection=_connection_response_view(connection), credentials={})
```
- [ ] **Step 7: Run API tests**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_channel_connection_api.py -q
```
Expected: `2 passed`.
- [ ] **Step 8: Run focused backend tests**
Run:
```bash
cd app-instance/backend
uv run pytest \
tests/unit/test_channel_connection_store.py \
tests/unit/test_channel_connector_registry.py \
tests/unit/test_telegram_channel_connector.py \
tests/unit/test_channel_connection_api.py \
-q
```
Expected: all focused connector tests pass.
- [ ] **Step 9: Commit Task 5**
```bash
git add \
app-instance/backend/beaver/interfaces/web/app.py \
app-instance/backend/beaver/interfaces/web/schemas/chat.py \
app-instance/backend/beaver/interfaces/web/schemas/__init__.py \
app-instance/backend/tests/unit/test_channel_connection_api.py
git commit -m "feat: add channel connection control api"
```
---
### Task 6: Final Verification And Spec Alignment
**Files:**
- Review: `docs/superpowers/specs/2026-06-02-channel-connectors-and-pairing-design.md`
- Review: `docs/superpowers/specs/2026-06-02-chat-platform-channel-adapters-design.md`
- Review: `docs/superpowers/specs/2026-06-01-terminal-websocket-channel-design.md`
- [ ] **Step 1: Run connector and existing channel tests**
Run:
```bash
cd app-instance/backend
uv run pytest \
tests/unit/test_channel_connection_store.py \
tests/unit/test_channel_connector_registry.py \
tests/unit/test_telegram_channel_connector.py \
tests/unit/test_channel_connection_api.py \
tests/unit/test_channel_runtime.py \
tests/unit/test_telegram_channel_adapter.py \
-q
```
Expected: all listed tests pass.
- [ ] **Step 2: Run import tests**
Run:
```bash
cd app-instance/backend
uv run pytest tests/unit/test_imports.py -q
```
Expected: all import tests pass.
- [ ] **Step 3: Scan for leaked secret values in connector events and responses**
Run:
```bash
cd app-instance/backend
rg -n "token-1|token-2|bad-token|secret-token" tests/unit beaver || true
```
Expected: test fixture strings only appear in test files. They must not appear in implementation files or generated event log code.
- [ ] **Step 4: Update adapter spec wording if still contradictory**
If `docs/superpowers/specs/2026-06-02-chat-platform-channel-adapters-design.md` still says pairing is out of scope and Node sidecars are disallowed, change only the Non-Goals and Access Control text:
```markdown
- Use internal adapters by default, but allow external connector processes where platform SDK or login state requires them.
```
```markdown
Pairing is owned by the connector layer. Platform adapters assume a materialized `ChannelConnection` and adapter-ready runtime config.
```
- [ ] **Step 5: Commit spec alignment if changed**
If Step 4 changed docs:
```bash
git add docs/superpowers/specs/2026-06-02-chat-platform-channel-adapters-design.md
git commit -m "docs: align channel adapter spec with connector layer"
```
If Step 4 made no change, do not create an empty commit.
- [ ] **Step 6: Summarize remaining rollout**
Record in the final implementation response that this first plan does not implement Terminal pairing, Feishu/Lark connector, Weixin external connector, QQBot connector, frontend wizard, or hot adapter restart. Those are separate plans.