docs: refine channel connectors foundation plan

This commit is contained in:
2026-06-02 15:51:16 +08:00
parent d74a1c9c12
commit b25713a141

View File

@ -32,6 +32,7 @@ Excluded from this plan:
- QQBot connector.
- Frontend connection wizard.
- Hot starting/stopping adapters without backend restart.
- Multi-process-safe storage. The JSON stores use `threading.Lock` plus atomic file replace for the single backend process used in phase 1. Production multi-worker deployment needs a file lock or database-backed store.
## File Structure
@ -77,10 +78,7 @@ Create `app-instance/backend/tests/unit/test_channel_connection_store.py`:
```python
from __future__ import annotations
from datetime import datetime, timezone
from beaver.interfaces.channels.connections import (
ChannelConnection,
ChannelConnectionStore,
CredentialStore,
PairingTokenStore,
@ -578,6 +576,7 @@ class FakeConnector:
def __init__(self) -> None:
self.validated: list[str] = []
self.revoked: list[str] = []
async def validate(self, connection_id: str) -> ValidationResult:
self.validated.append(connection_id)
@ -594,6 +593,7 @@ class FakeConnector:
)
async def revoke(self, connection_id: str) -> None:
self.revoked.append(connection_id)
return None
@ -654,6 +654,32 @@ def test_connector_registry_materializes_only_connected_connections(tmp_path) ->
assert connection_store.get(draft.connection_id).status == "draft"
asyncio.run(run())
def test_connector_registry_revoke_calls_connector_and_updates_store(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
connector = FakeConnector()
registry = ChannelConnectorRegistry(connection_store=connection_store, credential_store=credential_store)
registry.register(connector)
connection = connection_store.create(
kind="fake",
mode="webhook",
display_name="Fake",
account_id="fake-account",
owner_user_id=None,
auth_type="token",
)
connection_store.update_status(connection.connection_id, status="connected", last_error=None)
await registry.revoke(connection.connection_id)
assert connector.revoked == [connection.connection_id]
assert connection_store.get(connection.connection_id).status == "revoked"
asyncio.run(run())
```
- [ ] **Step 2: Run tests to verify they fail**
@ -780,7 +806,7 @@ cd app-instance/backend
uv run pytest tests/unit/test_channel_connector_registry.py -q
```
Expected: `2 passed`.
Expected: `3 passed`.
- [ ] **Step 6: Commit Task 2**
@ -920,6 +946,32 @@ def test_telegram_connector_validation_failure_sets_error_status(tmp_path) -> No
assert "invalid token" in (result.error or "")
asyncio.run(run())
def test_telegram_connector_revoke_leaves_store_status_to_registry(tmp_path) -> None:
async def run() -> None:
connection_store = ChannelConnectionStore(tmp_path / "connections.json")
credential_store = CredentialStore(tmp_path / "credentials.json")
connection = connection_store.create(
kind="telegram",
mode="polling",
display_name="Telegram Main",
account_id="telegram:12345",
owner_user_id=None,
auth_type="token",
)
connection_store.update_status(connection.connection_id, status="connected", last_error=None)
connector = TelegramConnector(
connection_store=connection_store,
credential_store=credential_store,
client_factory=lambda token: FakeTelegramClient(),
)
await connector.revoke(connection.connection_id)
assert connection_store.get(connection.connection_id).status == "connected"
asyncio.run(run())
```
- [ ] **Step 2: Run tests to verify they fail**
@ -933,7 +985,18 @@ uv run pytest tests/unit/test_telegram_channel_connector.py -q
Expected: fail with `ImportError: cannot import name 'TelegramConnector'`.
- [ ] **Step 3: Implement TelegramConnector**
- [ ] **Step 3: Verify Telegram dependency**
Run:
```bash
cd app-instance/backend
rg -n "python-telegram-bot" pyproject.toml uv.lock | sed -n '1,20p'
```
Expected output includes `python-telegram-bot>=22.0,<23.0`. The default client factory may use `from telegram import Bot`, and `Bot.get_me()` is awaitable in this dependency line.
- [ ] **Step 4: Implement TelegramConnector**
Create `app-instance/backend/beaver/interfaces/channels/connections/telegram.py`:
@ -1003,7 +1066,9 @@ class TelegramConnector:
)
async def revoke(self, connection_id: str) -> None:
self.connection_store.revoke(connection_id)
# Telegram bot tokens do not have a Beaver-managed platform revoke action.
# The registry owns local connection state transitions.
return None
def _bot_token(self, credentials_ref: str | None) -> str:
if not credentials_ref:
@ -1030,7 +1095,7 @@ def _default_client_factory(token: str) -> Any:
return Bot(token=token)
```
- [ ] **Step 4: Export TelegramConnector**
- [ ] **Step 5: Export TelegramConnector**
Modify `app-instance/backend/beaver/interfaces/channels/connections/__init__.py`:
@ -1056,7 +1121,7 @@ __all__ = [
]
```
- [ ] **Step 5: Run Telegram connector tests**
- [ ] **Step 6: Run Telegram connector tests**
Run:
@ -1065,9 +1130,9 @@ cd app-instance/backend
uv run pytest tests/unit/test_telegram_channel_connector.py -q
```
Expected: `3 passed`.
Expected: `4 passed`.
- [ ] **Step 6: Commit Task 3**
- [ ] **Step 7: Commit Task 3**
```bash
git add app-instance/backend/beaver/interfaces/channels/connections app-instance/backend/tests/unit/test_telegram_channel_connector.py
@ -1082,7 +1147,22 @@ git commit -m "feat: add telegram channel connector"
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
- Test: `app-instance/backend/tests/unit/test_channel_connector_registry.py`
- [ ] **Step 1: Extend registry tests for ChannelConfig materialization**
- [ ] **Step 1: Verify ChannelConfig fields**
Run:
```bash
cd app-instance/backend
uv run python - <<'PY'
from dataclasses import fields
from beaver.foundation.config.schema import ChannelConfig
print([field.name for field in fields(ChannelConfig)])
PY
```
Expected output includes `enabled`, `kind`, `mode`, `account_id`, `display_name`, `config`, and `secrets`.
- [ ] **Step 2: Extend registry tests for ChannelConfig materialization**
Append to `app-instance/backend/tests/unit/test_channel_connector_registry.py`:
@ -1131,7 +1211,7 @@ def test_connector_registry_materializes_channel_configs_with_credentials(tmp_pa
asyncio.run(run())
```
- [ ] **Step 2: Run registry tests to verify failure**
- [ ] **Step 3: Run registry tests to verify failure**
Run:
@ -1142,7 +1222,7 @@ uv run pytest tests/unit/test_channel_connector_registry.py::test_connector_regi
Expected: fail with `AttributeError: 'ChannelConnectorRegistry' object has no attribute 'materialize_channel_configs'`.
- [ ] **Step 3: Implement channel config materialization**
- [ ] **Step 4: Implement channel config materialization**
Modify `app-instance/backend/beaver/interfaces/channels/connections/connectors.py`:
@ -1169,7 +1249,7 @@ Add this method to `ChannelConnectorRegistry`:
return channels
```
- [ ] **Step 4: Add app helpers for connection state paths and registry construction**
- [ ] **Step 5: Add app helpers for connection state paths and registry construction**
Modify `app-instance/backend/beaver/interfaces/web/app.py` imports:
@ -1178,7 +1258,6 @@ from beaver.interfaces.channels.connections import (
ChannelConnectionStore,
ChannelConnectorRegistry,
CredentialStore,
PairingTokenStore,
TelegramConnector,
)
```
@ -1204,12 +1283,13 @@ def _build_channel_connector_registry(workspace: Path) -> ChannelConnectorRegist
return registry
```
- [ ] **Step 5: Merge materialized connections into runtime startup**
- [ ] **Step 6: Merge materialized connections into runtime startup**
Modify the lifespan block in `app-instance/backend/beaver/interfaces/web/app.py` where `ChannelRuntime` is created:
```python
loaded = attached_service.create_loop().boot()
app.state.channel_connection_workspace = loaded.workspace
connector_registry = _build_channel_connector_registry(loaded.workspace)
app.state.channel_connector_registry = connector_registry
connection_channels = await connector_registry.materialize_channel_configs()
@ -1224,7 +1304,7 @@ Modify the lifespan block in `app-instance/backend/beaver/interfaces/web/app.py`
Keep `app.state.channel_connector_registry = connector_registry` before runtime startup so API handlers can reuse the same stores.
- [ ] **Step 6: Run registry tests**
- [ ] **Step 7: Run registry tests**
Run:
@ -1235,7 +1315,7 @@ uv run pytest tests/unit/test_channel_connector_registry.py -q
Expected: all tests pass.
- [ ] **Step 7: Commit Task 4**
- [ ] **Step 8: Commit Task 4**
```bash
git add app-instance/backend/beaver/interfaces/channels/connections/connectors.py app-instance/backend/beaver/interfaces/web/app.py app-instance/backend/tests/unit/test_channel_connector_registry.py
@ -1265,7 +1345,7 @@ from beaver.interfaces.web.app import create_app
from beaver.services.agent_service import AgentService
def test_channel_connection_api_creates_lists_and_revokes(tmp_path) -> None:
def test_channel_connection_api_creates_updates_lists_and_revokes(tmp_path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text('{"agents": {"defaults": {"workspace": "%s"}}, "providers": {}}' % str(tmp_path), encoding="utf-8")
service = AgentService(config_path=config_path)
@ -1289,11 +1369,26 @@ def test_channel_connection_api_creates_lists_and_revokes(tmp_path) -> None:
connection_id = body["connection"]["connection_id"]
assert body["connection"]["kind"] == "telegram"
assert body["connection"]["status"] == "draft"
assert "credentials_ref" not in body["connection"]
assert body["credentials"] == {"botToken": "***"}
patched = client.patch(
f"/api/channel-connections/{connection_id}",
json={
"displayName": "Telegram Ops",
"config": {"maxMessageChars": 2048},
"secrets": {"botToken": "token-2"},
},
)
assert patched.status_code == 200
assert patched.json()["connection"]["display_name"] == "Telegram Ops"
assert patched.json()["connection"]["runtime_config"] == {"maxMessageChars": 2048}
assert patched.json()["credentials"] == {"botToken": "***"}
listed = client.get("/api/channel-connections")
assert listed.status_code == 200
assert listed.json()[0]["connection_id"] == connection_id
assert "credentials_ref" not in listed.json()[0]
revoked = client.post(f"/api/channel-connections/{connection_id}/revoke")
assert revoked.status_code == 200
@ -1354,6 +1449,15 @@ class WebChannelConnectionResponse(BaseModel):
credentials: dict[str, str] = Field(default_factory=dict)
class WebChannelConnectionUpdateRequest(BaseModel):
"""Update editable channel connection setup fields."""
display_name: str | None = Field(default=None, alias="displayName")
account_id: str | None = Field(default=None, alias="accountId")
config: dict[str, Any] | None = None
secrets: dict[str, str | None] | None = None
class WebChannelValidationResponse(BaseModel):
"""Connector validation response."""
@ -1373,6 +1477,7 @@ Modify `app-instance/backend/beaver/interfaces/web/schemas/__init__.py` imports
```python
WebChannelConnectionCreateRequest,
WebChannelConnectionResponse,
WebChannelConnectionUpdateRequest,
WebChannelValidationResponse,
```
@ -1384,6 +1489,7 @@ Modify imports in `app-instance/backend/beaver/interfaces/web/app.py`:
from beaver.interfaces.web.schemas import (
WebChannelConnectionCreateRequest,
WebChannelConnectionResponse,
WebChannelConnectionUpdateRequest,
WebChannelValidationResponse,
)
```
@ -1394,11 +1500,21 @@ Add helper:
def get_channel_connector_registry(request: Request) -> ChannelConnectorRegistry:
registry = getattr(request.app.state, "channel_connector_registry", None)
if not isinstance(registry, ChannelConnectorRegistry):
agent_service = get_agent_service(request)
workspace = agent_service.loader.workspace
workspace = getattr(request.app.state, "channel_connection_workspace", None)
if workspace is None:
agent_service = get_agent_service(request)
workspace = agent_service.loader.workspace
registry = _build_channel_connector_registry(workspace)
request.app.state.channel_connector_registry = registry
return registry
def _connection_response_view(connection: Any) -> dict[str, Any]:
view = connection.to_dict()
view.pop("credentials_ref", None)
view.pop("connector_ref", None)
view.pop("pairing_session_id", None)
return view
```
- [ ] **Step 6: Add connection API routes**
@ -1413,7 +1529,7 @@ Add routes near existing `/api/channels` routes in `app-instance/backend/beaver/
@app.get("/api/channel-connections")
async def list_channel_connections(request: Request) -> list[dict[str, Any]]:
registry = get_channel_connector_registry(request)
return [connection.to_dict() for connection in registry.connection_store.list()]
return [_connection_response_view(connection) for connection in registry.connection_store.list()]
@app.post("/api/channel-connections", response_model=WebChannelConnectionResponse)
async def create_channel_connection(
@ -1440,10 +1556,37 @@ Add routes near existing `/api/channels` routes in `app-instance/backend/beaver/
runtime_config=payload.config or {},
)
return WebChannelConnectionResponse(
connection=connection.to_dict(),
connection=_connection_response_view(connection),
credentials=registry.credential_store.redacted(credentials_ref),
)
@app.patch("/api/channel-connections/{connection_id}", response_model=WebChannelConnectionResponse)
async def update_channel_connection(
connection_id: str,
request: Request,
payload: WebChannelConnectionUpdateRequest,
) -> WebChannelConnectionResponse:
registry = get_channel_connector_registry(request)
try:
connection = registry.connection_store.get(connection_id)
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
if payload.display_name is not None:
connection.display_name = _clean_text(payload.display_name) or connection.display_name
if payload.account_id is not None:
connection.account_id = _clean_text(payload.account_id) or connection.account_id
if payload.config is not None:
connection.runtime_config = payload.config
if payload.secrets:
secrets = {key: value for key, value in payload.secrets.items() if value}
if secrets:
connection.credentials_ref = registry.credential_store.put(kind=connection.kind, values=secrets)
connection = registry.connection_store.update(connection)
return WebChannelConnectionResponse(
connection=_connection_response_view(connection),
credentials=registry.credential_store.redacted(connection.credentials_ref),
)
@app.get("/api/channel-connections/{connection_id}", response_model=WebChannelConnectionResponse)
async def get_channel_connection(connection_id: str, request: Request) -> WebChannelConnectionResponse:
registry = get_channel_connector_registry(request)
@ -1452,7 +1595,7 @@ Add routes near existing `/api/channels` routes in `app-instance/backend/beaver/
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
return WebChannelConnectionResponse(
connection=connection.to_dict(),
connection=_connection_response_view(connection),
credentials=registry.credential_store.redacted(connection.credentials_ref),
)
@ -1471,7 +1614,7 @@ Add routes near existing `/api/channels` routes in `app-instance/backend/beaver/
display_name=result.display_name,
error=result.error,
metadata=result.metadata,
connection=connection.to_dict(),
connection=_connection_response_view(connection),
)
@app.post("/api/channel-connections/{connection_id}/revoke", response_model=WebChannelConnectionResponse)
@ -1482,7 +1625,7 @@ Add routes near existing `/api/channels` routes in `app-instance/backend/beaver/
connection = registry.connection_store.get(connection_id)
except KeyError:
raise HTTPException(status_code=404, detail="Channel connection not found")
return WebChannelConnectionResponse(connection=connection.to_dict(), credentials={})
return WebChannelConnectionResponse(connection=_connection_response_view(connection), credentials={})
```
- [ ] **Step 7: Run API tests**
@ -1567,7 +1710,7 @@ Run:
```bash
cd app-instance/backend
rg -n "token-1|bad-token|secret-token" tests/unit beaver || true
rg -n "token-1|token-2|bad-token|secret-token" tests/unit beaver || true
```
Expected: test fixture strings only appear in test files. They must not appear in implementation files or generated event log code.