1106 lines
38 KiB
Markdown
1106 lines
38 KiB
Markdown
# Terminal WebSocket Channel Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Build a text-only WebSocket channel adapter so a small terminal device can connect to Beaver through the channel runtime and receive final assistant replies.
|
|
|
|
**Architecture:** Add `TerminalWebSocketAdapter` as a channel-native adapter. The FastAPI route `/api/channels/{channel_id}/ws` delegates WebSocket handling to the adapter, while inbound and outbound traffic still flows through `ChannelRuntime`, `MessageBus`, `AgentService.handle_inbound_message()`, and `ChannelManager.dispatch_outbound()`.
|
|
|
|
**Tech Stack:** Python 3.14, FastAPI WebSocket, Beaver `ChannelRuntime`, Beaver `MessageBus`, pytest/TestClient, Next.js TypeScript status UI, nginx.
|
|
|
|
---
|
|
|
|
## Current Context
|
|
|
|
This plan assumes channel runtime v1 exists in the working tree:
|
|
|
|
- `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- `app-instance/backend/beaver/interfaces/channels/generic_webhook.py`
|
|
- `app-instance/backend/beaver/interfaces/channels/state.py`
|
|
- `app-instance/backend/beaver/foundation/events/message_bus.py` with `ChannelIdentity`
|
|
- `app-instance/backend/beaver/interfaces/web/app.py` with `/api/channels/{channel_id}/webhook`
|
|
|
|
Do not route terminal messages to `AgentService` from the WebSocket route. The adapter must call `ChannelRuntime.accept_inbound()` and let the runtime bridge publish outbound replies.
|
|
|
|
## Files
|
|
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/terminal_websocket.py`
|
|
- Owns WebSocket connection state, JSON frame validation, connect/message/ping handling, and outbound delivery.
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- Instantiate terminal websocket adapters.
|
|
- Include terminal capabilities, websocket URL, and connected peer counts in status.
|
|
- Provide a small event recording callback for adapters.
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py`
|
|
- Export `TerminalWebSocketAdapter`.
|
|
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
|
|
- Add `/api/channels/{channel_id}/ws` WebSocket route that delegates to terminal adapters.
|
|
- Modify: `app-instance/nginx.conf`
|
|
- Add WebSocket upgrade headers to `/api/channels/`.
|
|
- Modify: `app-instance/frontend/types/index.ts`
|
|
- Add `websocket_url` and `connected_peers` to `ChannelStatus`.
|
|
- Modify: `app-instance/frontend/app/(app)/status/page.tsx`
|
|
- Show WebSocket URL and connected peer count in channel details.
|
|
- Test: `app-instance/backend/tests/unit/test_terminal_websocket_channel.py`
|
|
- Cover connect, ping, message roundtrip, duplicates, errors, status, and disconnect/unclaimed behavior.
|
|
|
|
---
|
|
|
|
### Task 1: Add Failing WebSocket Roundtrip Tests
|
|
|
|
**Files:**
|
|
- Create: `app-instance/backend/tests/unit/test_terminal_websocket_channel.py`
|
|
|
|
- [ ] **Step 1: Write failing tests for connect, ping, and message roundtrip**
|
|
|
|
Create `app-instance/backend/tests/unit/test_terminal_websocket_channel.py`:
|
|
|
|
```python
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
from beaver.foundation.events import InboundMessage, OutboundMessage
|
|
from beaver.interfaces.web.app import create_app
|
|
from beaver.services.agent_service import AgentService
|
|
|
|
|
|
class TerminalFakeAgentService(AgentService):
|
|
def __init__(self, *, config_path: Path, delay_seconds: float = 0.0) -> None:
|
|
super().__init__(config_path=config_path)
|
|
self.delay_seconds = delay_seconds
|
|
self.inbound_calls: list[InboundMessage] = []
|
|
|
|
async def handle_inbound_message(self, inbound: InboundMessage) -> OutboundMessage:
|
|
self.inbound_calls.append(inbound)
|
|
if self.delay_seconds:
|
|
await asyncio.sleep(self.delay_seconds)
|
|
return OutboundMessage(
|
|
message_id=inbound.message_id,
|
|
channel=inbound.channel,
|
|
content=f"echo:{inbound.content}",
|
|
session_id=inbound.session_id,
|
|
finish_reason="stop",
|
|
run_id="run-1",
|
|
channel_identity=inbound.channel_identity,
|
|
)
|
|
|
|
|
|
def write_terminal_config(tmp_path: Path) -> Path:
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
config_path = tmp_path / "config.json"
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"agents": {"defaults": {"workspace": str(workspace), "model": "openai/gpt-5"}},
|
|
"providers": {},
|
|
"channels": {
|
|
"terminal-dev": {
|
|
"enabled": True,
|
|
"kind": "terminal",
|
|
"mode": "websocket",
|
|
"accountId": "local",
|
|
"displayName": "Terminal Dev",
|
|
"config": {"heartbeatSeconds": 30, "maxMessageChars": 20000},
|
|
}
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
return config_path
|
|
|
|
|
|
def test_terminal_websocket_connect_ping_and_message_roundtrip(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json(
|
|
{
|
|
"type": "connect",
|
|
"peer_id": "device-001",
|
|
"device_name": "desk-terminal",
|
|
"capabilities": ["text"],
|
|
}
|
|
)
|
|
assert websocket.receive_json() == {
|
|
"type": "connected",
|
|
"channel_id": "terminal-dev",
|
|
"session_id": "terminal-dev:local:device-001",
|
|
}
|
|
|
|
websocket.send_json({"type": "ping"})
|
|
assert websocket.receive_json() == {"type": "pong"}
|
|
|
|
websocket.send_json(
|
|
{
|
|
"type": "message",
|
|
"message_id": "device-001-000001",
|
|
"text": "hello",
|
|
}
|
|
)
|
|
assert websocket.receive_json() == {
|
|
"type": "ack",
|
|
"message_id": "device-001-000001",
|
|
"session_id": "terminal-dev:local:device-001",
|
|
"accepted": True,
|
|
}
|
|
reply = websocket.receive_json()
|
|
|
|
service.close()
|
|
assert reply == {
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"message_id": "device-001-000001",
|
|
"run_id": "run-1",
|
|
"text": "echo:hello",
|
|
"finish_reason": "stop",
|
|
}
|
|
assert len(service.inbound_calls) == 1
|
|
inbound = service.inbound_calls[0]
|
|
assert inbound.channel == "terminal-dev"
|
|
assert inbound.content == "hello"
|
|
assert inbound.content_type == "text"
|
|
assert inbound.session_id == "terminal-dev:local:device-001"
|
|
assert inbound.channel_identity is not None
|
|
assert inbound.channel_identity.peer_id == "device-001"
|
|
assert inbound.channel_identity.peer_type == "terminal"
|
|
assert inbound.channel_identity.message_id == "device-001-000001"
|
|
```
|
|
|
|
- [ ] **Step 2: Run the failing test**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_terminal_websocket_channel.py::test_terminal_websocket_connect_ping_and_message_roundtrip -q
|
|
```
|
|
|
|
Expected: fail because `/api/channels/terminal-dev/ws` does not exist or `terminal/websocket` is unsupported.
|
|
|
|
- [ ] **Step 3: Commit the failing test**
|
|
|
|
```bash
|
|
git add app-instance/backend/tests/unit/test_terminal_websocket_channel.py
|
|
git commit -m "test: cover terminal websocket channel roundtrip"
|
|
```
|
|
|
|
If channel runtime v1 changes are still uncommitted in this working tree, do not include unrelated files in this commit.
|
|
|
|
---
|
|
|
|
### Task 2: Implement TerminalWebSocketAdapter
|
|
|
|
**Files:**
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/terminal_websocket.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py`
|
|
|
|
- [ ] **Step 1: Create the terminal websocket adapter**
|
|
|
|
Create `app-instance/backend/beaver/interfaces/channels/terminal_websocket.py`:
|
|
|
|
```python
|
|
"""Text-only terminal WebSocket channel adapter."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from collections.abc import Callable
|
|
from contextlib import suppress
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage
|
|
from beaver.interfaces.channels.base import ChannelInboundSink
|
|
|
|
try:
|
|
from fastapi import WebSocket
|
|
from starlette.websockets import WebSocketDisconnect
|
|
except ModuleNotFoundError: # pragma: no cover - import-only fallback
|
|
class WebSocketDisconnect(Exception):
|
|
"""Fallback disconnect exception for skeleton import environments."""
|
|
|
|
class WebSocket: # type: ignore[override]
|
|
"""Fallback websocket annotation shim."""
|
|
|
|
|
|
def _clean(value: Any) -> str:
|
|
return str(value or "").strip()
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class TerminalConnection:
|
|
websocket: WebSocket
|
|
peer_id: str
|
|
session_id: str
|
|
thread_id: str | None = None
|
|
user_id: str | None = None
|
|
device_name: str = ""
|
|
capabilities: list[str] = field(default_factory=list)
|
|
|
|
|
|
class TerminalWebSocketAdapter:
|
|
"""Accept text terminal websocket frames and deliver final assistant replies."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
channel_id: str,
|
|
kind: str,
|
|
mode: str,
|
|
account_id: str,
|
|
display_name: str = "",
|
|
inbound_sink: ChannelInboundSink,
|
|
event_recorder: Callable[..., None] | None = None,
|
|
heartbeat_seconds: float = 30,
|
|
max_message_chars: int = 20000,
|
|
) -> None:
|
|
self.channel_id = channel_id
|
|
self.kind = kind
|
|
self.mode = mode
|
|
self.account_id = account_id
|
|
self.display_name = display_name or channel_id
|
|
self.inbound_sink = inbound_sink
|
|
self.event_recorder = event_recorder
|
|
self.heartbeat_seconds = max(1.0, float(heartbeat_seconds))
|
|
self.max_message_chars = max(1, int(max_message_chars))
|
|
self.started = False
|
|
self._connections_by_session: dict[str, TerminalConnection] = {}
|
|
self._session_by_peer: dict[str, str] = {}
|
|
|
|
async def start(self) -> None:
|
|
self.started = True
|
|
|
|
async def stop(self) -> None:
|
|
self.started = False
|
|
for connection in list(self._connections_by_session.values()):
|
|
with suppress(Exception):
|
|
await connection.websocket.close(code=1001)
|
|
self._connections_by_session.clear()
|
|
self._session_by_peer.clear()
|
|
|
|
def status_extra(self) -> dict[str, Any]:
|
|
return {"connected_peers": len(self._connections_by_session)}
|
|
|
|
async def handle_websocket(self, websocket: WebSocket) -> None:
|
|
await websocket.accept()
|
|
connection: TerminalConnection | None = None
|
|
try:
|
|
while True:
|
|
try:
|
|
payload = await websocket.receive_json()
|
|
except WebSocketDisconnect:
|
|
break
|
|
except ValueError:
|
|
await websocket.send_json({"type": "error", "error": "Invalid websocket JSON payload"})
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
await websocket.send_json({"type": "error", "error": "Websocket payload must be a JSON object"})
|
|
continue
|
|
|
|
frame_type = _clean(payload.get("type")).lower()
|
|
if frame_type == "ping":
|
|
await websocket.send_json({"type": "pong"})
|
|
continue
|
|
if frame_type == "connect":
|
|
connection = await self._handle_connect(websocket, payload, current=connection)
|
|
continue
|
|
if frame_type == "message":
|
|
if connection is None:
|
|
await websocket.send_json({"type": "error", "error": "connect is required before message"})
|
|
continue
|
|
await self._handle_message(websocket, connection, payload)
|
|
continue
|
|
|
|
await websocket.send_json(
|
|
{
|
|
"type": "error",
|
|
"error": f"Unsupported websocket frame type: {frame_type or '<empty>'}",
|
|
}
|
|
)
|
|
finally:
|
|
if connection is not None:
|
|
self._remove_connection(connection)
|
|
self._record(
|
|
kind="terminal_disconnected",
|
|
session_id=connection.session_id,
|
|
metadata={"peer_id": connection.peer_id, "device_name": connection.device_name},
|
|
)
|
|
|
|
async def _handle_connect(
|
|
self,
|
|
websocket: WebSocket,
|
|
payload: dict[str, Any],
|
|
*,
|
|
current: TerminalConnection | None,
|
|
) -> TerminalConnection | None:
|
|
peer_id = _clean(payload.get("peer_id"))
|
|
if not peer_id:
|
|
await websocket.send_json({"type": "error", "error": "peer_id is required"})
|
|
return current
|
|
|
|
thread_id = _clean(payload.get("thread_id")) or None
|
|
user_id = _clean(payload.get("user_id")) or None
|
|
device_name = _clean(payload.get("device_name"))
|
|
capabilities = [str(item) for item in payload.get("capabilities") or [] if item is not None]
|
|
identity = ChannelIdentity(
|
|
channel_id=self.channel_id,
|
|
kind=self.kind,
|
|
account_id=self.account_id,
|
|
peer_id=peer_id,
|
|
thread_id=thread_id,
|
|
peer_type="terminal",
|
|
user_id=user_id,
|
|
)
|
|
session_id = identity.session_id()
|
|
connection = TerminalConnection(
|
|
websocket=websocket,
|
|
peer_id=peer_id,
|
|
session_id=session_id,
|
|
thread_id=thread_id,
|
|
user_id=user_id,
|
|
device_name=device_name,
|
|
capabilities=capabilities,
|
|
)
|
|
|
|
if current is not None and current.session_id != session_id:
|
|
self._remove_connection(current)
|
|
old = self._connections_by_session.get(session_id)
|
|
if old is not None and old.websocket is not websocket:
|
|
with suppress(Exception):
|
|
await old.websocket.close(code=1000)
|
|
self._connections_by_session[session_id] = connection
|
|
self._session_by_peer[peer_id] = session_id
|
|
self._record(
|
|
kind="terminal_connected",
|
|
session_id=session_id,
|
|
metadata={"peer_id": peer_id, "device_name": device_name, "capabilities": capabilities},
|
|
)
|
|
await websocket.send_json(
|
|
{
|
|
"type": "connected",
|
|
"channel_id": self.channel_id,
|
|
"session_id": session_id,
|
|
}
|
|
)
|
|
return connection
|
|
|
|
async def _handle_message(
|
|
self,
|
|
websocket: WebSocket,
|
|
connection: TerminalConnection,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
message_id = _clean(payload.get("message_id"))
|
|
text = _clean(payload.get("text"))
|
|
if not message_id:
|
|
await websocket.send_json({"type": "error", "error": "message_id is required"})
|
|
return
|
|
if not text:
|
|
await websocket.send_json({"type": "error", "error": "text is required"})
|
|
return
|
|
if len(text) > self.max_message_chars:
|
|
await websocket.send_json(
|
|
{
|
|
"type": "error",
|
|
"error": f"text exceeds max_message_chars ({self.max_message_chars})",
|
|
}
|
|
)
|
|
return
|
|
|
|
thread_id = _clean(payload.get("thread_id")) or connection.thread_id
|
|
user_id = _clean(payload.get("user_id")) or connection.user_id
|
|
identity = ChannelIdentity(
|
|
channel_id=self.channel_id,
|
|
kind=self.kind,
|
|
account_id=self.account_id,
|
|
peer_id=connection.peer_id,
|
|
thread_id=thread_id,
|
|
peer_type="terminal",
|
|
user_id=user_id,
|
|
message_id=message_id,
|
|
)
|
|
inbound = InboundMessage(
|
|
channel=self.channel_id,
|
|
content=text,
|
|
content_type="text",
|
|
user_id=user_id,
|
|
channel_identity=identity,
|
|
metadata={
|
|
"terminal": {
|
|
"peer_id": connection.peer_id,
|
|
"device_name": connection.device_name,
|
|
"capabilities": connection.capabilities,
|
|
}
|
|
},
|
|
)
|
|
accept = await self.inbound_sink.accept_inbound(inbound)
|
|
ack: dict[str, Any] = {
|
|
"type": "ack",
|
|
"message_id": message_id,
|
|
"session_id": accept.session_id or identity.session_id(),
|
|
"accepted": accept.accepted,
|
|
}
|
|
if accept.duplicate:
|
|
ack["duplicate"] = True
|
|
ack["pending"] = accept.pending
|
|
record = accept.record or {}
|
|
if record.get("reply"):
|
|
ack["reply"] = record["reply"]
|
|
if accept.error or record.get("error"):
|
|
ack["error"] = accept.error or record.get("error")
|
|
await websocket.send_json(ack)
|
|
|
|
async def send(self, message: OutboundMessage) -> None:
|
|
session_id = message.session_id
|
|
if not session_id and message.channel_identity is not None:
|
|
session_id = message.channel_identity.session_id()
|
|
connection = self._connections_by_session.get(session_id or "")
|
|
if connection is None:
|
|
message.metadata["delivery_status"] = "unclaimed"
|
|
return
|
|
|
|
payload = {
|
|
"type": "message",
|
|
"role": "assistant",
|
|
"message_id": message.channel_identity.message_id if message.channel_identity else message.message_id,
|
|
"run_id": message.run_id,
|
|
"text": message.content,
|
|
"finish_reason": message.finish_reason,
|
|
}
|
|
try:
|
|
await connection.websocket.send_json(payload)
|
|
except Exception:
|
|
message.metadata["delivery_status"] = "unclaimed"
|
|
self._remove_connection(connection)
|
|
return
|
|
|
|
def _remove_connection(self, connection: TerminalConnection) -> None:
|
|
current = self._connections_by_session.get(connection.session_id)
|
|
if current is connection:
|
|
self._connections_by_session.pop(connection.session_id, None)
|
|
if self._session_by_peer.get(connection.peer_id) == connection.session_id:
|
|
self._session_by_peer.pop(connection.peer_id, None)
|
|
|
|
def _record(
|
|
self,
|
|
*,
|
|
kind: str,
|
|
session_id: str | None = None,
|
|
message_id: str | None = None,
|
|
status: str = "ok",
|
|
error: str | None = None,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> None:
|
|
if self.event_recorder is None:
|
|
return
|
|
self.event_recorder(
|
|
channel_id=self.channel_id,
|
|
kind=kind,
|
|
session_id=session_id,
|
|
message_id=message_id,
|
|
status=status,
|
|
error=error,
|
|
metadata=metadata,
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 2: Export the adapter**
|
|
|
|
Modify `app-instance/backend/beaver/interfaces/channels/__init__.py` so it imports and exports the adapter:
|
|
|
|
```python
|
|
from .terminal_websocket import TerminalWebSocketAdapter
|
|
```
|
|
|
|
Add `"TerminalWebSocketAdapter"` to `__all__` if this file defines `__all__`.
|
|
|
|
- [ ] **Step 3: Run import-focused tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_imports.py -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 4: Commit adapter skeleton**
|
|
|
|
```bash
|
|
git add app-instance/backend/beaver/interfaces/channels/terminal_websocket.py app-instance/backend/beaver/interfaces/channels/__init__.py
|
|
git commit -m "feat: add terminal websocket channel adapter"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Wire Adapter Factory, Status, and WebSocket Route
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
|
|
- Modify: `app-instance/nginx.conf`
|
|
|
|
- [ ] **Step 1: Add runtime event recorder and terminal adapter factory**
|
|
|
|
Modify `app-instance/backend/beaver/interfaces/channels/runtime.py`.
|
|
|
|
Add this method to `ChannelRuntime`:
|
|
|
|
```python
|
|
def record_event(
|
|
self,
|
|
*,
|
|
channel_id: str,
|
|
kind: str,
|
|
session_id: str | None = None,
|
|
message_id: str | None = None,
|
|
run_id: str | None = None,
|
|
status: str = "ok",
|
|
error: str | None = None,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> None:
|
|
self.events.record(
|
|
channel_id=channel_id,
|
|
kind=kind,
|
|
session_id=session_id,
|
|
message_id=message_id,
|
|
run_id=run_id,
|
|
status=status,
|
|
error=error,
|
|
metadata=metadata,
|
|
)
|
|
```
|
|
|
|
Update `_build_adapter()`:
|
|
|
|
```python
|
|
def _build_adapter(self, channel_id: str, cfg: ChannelConfig) -> ChannelAdapter:
|
|
if cfg.kind == "webhook" and cfg.mode == "webhook":
|
|
from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter
|
|
|
|
return GenericWebhookAdapter(
|
|
channel_id=channel_id,
|
|
kind=cfg.kind,
|
|
mode=cfg.mode,
|
|
account_id=cfg.account_id,
|
|
display_name=cfg.display_name,
|
|
inbound_sink=self,
|
|
response_timeout_seconds=float(cfg.config.get("response_timeout_seconds") or 1800),
|
|
)
|
|
|
|
if cfg.kind == "terminal" and cfg.mode == "websocket":
|
|
from beaver.interfaces.channels.terminal_websocket import TerminalWebSocketAdapter
|
|
|
|
return TerminalWebSocketAdapter(
|
|
channel_id=channel_id,
|
|
kind=cfg.kind,
|
|
mode=cfg.mode,
|
|
account_id=cfg.account_id,
|
|
display_name=cfg.display_name,
|
|
inbound_sink=self,
|
|
event_recorder=self.record_event,
|
|
heartbeat_seconds=float(cfg.config.get("heartbeat_seconds") or 30),
|
|
max_message_chars=int(cfg.config.get("max_message_chars") or 20000),
|
|
)
|
|
|
|
raise ValueError(f"Unsupported channel kind/mode: {cfg.kind}/{cfg.mode}")
|
|
```
|
|
|
|
Update `statuses()` so terminal channels expose capabilities, URL, and connected peer count:
|
|
|
|
```python
|
|
capabilities = []
|
|
webhook_url = None
|
|
websocket_url = None
|
|
connected_peers = 0
|
|
if cfg.kind == "webhook":
|
|
capabilities = ["receive_text", "send_text", "sync_webhook_response"]
|
|
webhook_url = f"/api/channels/{channel_id}/webhook"
|
|
elif cfg.kind == "terminal" and cfg.mode == "websocket":
|
|
capabilities = ["receive_text", "send_text", "persistent_connection"]
|
|
websocket_url = f"/api/channels/{channel_id}/ws"
|
|
adapter = self.adapters.get(channel_id)
|
|
if adapter is not None and hasattr(adapter, "status_extra"):
|
|
extra = adapter.status_extra() # type: ignore[attr-defined]
|
|
connected_peers = int(extra.get("connected_peers") or 0)
|
|
```
|
|
|
|
Include these fields in the status item:
|
|
|
|
```python
|
|
"webhook_url": webhook_url,
|
|
"websocket_url": websocket_url,
|
|
"connected_peers": connected_peers,
|
|
```
|
|
|
|
- [ ] **Step 2: Add the FastAPI channel websocket route**
|
|
|
|
Modify `app-instance/backend/beaver/interfaces/web/app.py` near the existing `/api/channels/{channel_id}/webhook` route:
|
|
|
|
```python
|
|
@app.websocket("/api/channels/{channel_id}/ws")
|
|
async def channel_websocket(websocket: WebSocket, channel_id: str) -> None:
|
|
runtime = getattr(websocket.app.state, "channel_runtime", None)
|
|
if not isinstance(runtime, ChannelRuntime):
|
|
await websocket.accept()
|
|
await websocket.send_json({"type": "error", "error": "Channel runtime is not running"})
|
|
await websocket.close(code=1011)
|
|
return
|
|
adapter = runtime.adapters.get(channel_id)
|
|
if adapter is None or not hasattr(adapter, "handle_websocket"):
|
|
await websocket.accept()
|
|
await websocket.send_json({"type": "error", "error": "WebSocket channel not found"})
|
|
await websocket.close(code=1008)
|
|
return
|
|
await adapter.handle_websocket(websocket) # type: ignore[attr-defined]
|
|
```
|
|
|
|
- [ ] **Step 3: Add WebSocket upgrade headers to nginx channel location**
|
|
|
|
Modify `app-instance/nginx.conf`:
|
|
|
|
```nginx
|
|
location /api/channels/ {
|
|
proxy_pass http://127.0.0.1:18080;
|
|
proxy_http_version 1.1;
|
|
proxy_set_header Upgrade $http_upgrade;
|
|
proxy_set_header Connection $connection_upgrade;
|
|
proxy_read_timeout 3600;
|
|
proxy_send_timeout 3600;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run the roundtrip test**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_terminal_websocket_channel.py::test_terminal_websocket_connect_ping_and_message_roundtrip -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 5: Run channel runtime tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_gateway_channels.py tests/unit/test_terminal_websocket_channel.py -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 6: Commit route and runtime wiring**
|
|
|
|
```bash
|
|
git add app-instance/backend/beaver/interfaces/channels/runtime.py app-instance/backend/beaver/interfaces/web/app.py app-instance/nginx.conf app-instance/backend/tests/unit/test_terminal_websocket_channel.py
|
|
git commit -m "feat: wire terminal websocket channel"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: Add Edge Case Tests and Finish Protocol Semantics
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/tests/unit/test_terminal_websocket_channel.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/terminal_websocket.py`
|
|
|
|
- [ ] **Step 1: Add tests for pre-connect, unknown frame, and validation errors**
|
|
|
|
Append to `app-instance/backend/tests/unit/test_terminal_websocket_channel.py`:
|
|
|
|
```python
|
|
def test_terminal_websocket_rejects_message_before_connect(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json({"type": "message", "message_id": "m1", "text": "hello"})
|
|
assert websocket.receive_json() == {
|
|
"type": "error",
|
|
"error": "connect is required before message",
|
|
}
|
|
websocket.send_json({"type": "ping"})
|
|
assert websocket.receive_json() == {"type": "pong"}
|
|
|
|
service.close()
|
|
assert service.inbound_calls == []
|
|
|
|
|
|
def test_terminal_websocket_unknown_frame_keeps_connection_open(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json({"type": "example"})
|
|
assert websocket.receive_json() == {
|
|
"type": "error",
|
|
"error": "Unsupported websocket frame type: example",
|
|
}
|
|
websocket.send_json({"type": "ping"})
|
|
assert websocket.receive_json() == {"type": "pong"}
|
|
|
|
service.close()
|
|
|
|
|
|
def test_terminal_websocket_validates_message_fields(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json({"type": "connect", "peer_id": "device-001"})
|
|
assert websocket.receive_json()["type"] == "connected"
|
|
|
|
websocket.send_json({"type": "message", "text": "hello"})
|
|
assert websocket.receive_json() == {"type": "error", "error": "message_id is required"}
|
|
|
|
websocket.send_json({"type": "message", "message_id": "m1", "text": " "})
|
|
assert websocket.receive_json() == {"type": "error", "error": "text is required"}
|
|
|
|
service.close()
|
|
assert service.inbound_calls == []
|
|
```
|
|
|
|
- [ ] **Step 2: Add duplicate message test**
|
|
|
|
Append:
|
|
|
|
```python
|
|
def test_terminal_websocket_duplicate_message_returns_cached_reply(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json({"type": "connect", "peer_id": "device-001"})
|
|
assert websocket.receive_json()["type"] == "connected"
|
|
|
|
frame = {"type": "message", "message_id": "device-001-000001", "text": "hello"}
|
|
websocket.send_json(frame)
|
|
assert websocket.receive_json()["accepted"] is True
|
|
assert websocket.receive_json()["text"] == "echo:hello"
|
|
|
|
websocket.send_json(frame)
|
|
duplicate = websocket.receive_json()
|
|
|
|
service.close()
|
|
assert duplicate["type"] == "ack"
|
|
assert duplicate["accepted"] is False
|
|
assert duplicate["duplicate"] is True
|
|
assert duplicate["pending"] is False
|
|
assert duplicate["reply"] == "echo:hello"
|
|
assert len(service.inbound_calls) == 1
|
|
```
|
|
|
|
- [ ] **Step 3: Add disconnect/unclaimed test**
|
|
|
|
Append:
|
|
|
|
```python
|
|
def test_terminal_websocket_disconnect_before_reply_records_unclaimed(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path, delay_seconds=0.05)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json({"type": "connect", "peer_id": "device-001"})
|
|
assert websocket.receive_json()["type"] == "connected"
|
|
websocket.send_json({"type": "message", "message_id": "device-001-000001", "text": "slow"})
|
|
assert websocket.receive_json()["accepted"] is True
|
|
|
|
# Allow the runtime bridge to finish after the socket has closed.
|
|
import time
|
|
|
|
time.sleep(0.15)
|
|
events = client.get("/api/channels/terminal-dev/events").json()
|
|
|
|
service.close()
|
|
kinds = [event["kind"] for event in events]
|
|
assert "terminal_disconnected" in kinds
|
|
assert "outbound_unclaimed" in kinds
|
|
```
|
|
|
|
- [ ] **Step 4: Add status exposure test**
|
|
|
|
Append:
|
|
|
|
```python
|
|
def test_terminal_channel_status_exposes_websocket_url_and_peer_count(tmp_path: Path) -> None:
|
|
config_path = write_terminal_config(tmp_path)
|
|
service = TerminalFakeAgentService(config_path=config_path)
|
|
app = create_app(service=service, manage_service_lifecycle=False)
|
|
|
|
with TestClient(app) as client:
|
|
initial = client.get("/api/status").json()["channels"][0]
|
|
assert initial["channel_id"] == "terminal-dev"
|
|
assert initial["websocket_url"] == "/api/channels/terminal-dev/ws"
|
|
assert initial["connected_peers"] == 0
|
|
assert "persistent_connection" in initial["capabilities"]
|
|
|
|
with client.websocket_connect("/api/channels/terminal-dev/ws") as websocket:
|
|
websocket.send_json({"type": "connect", "peer_id": "device-001"})
|
|
assert websocket.receive_json()["type"] == "connected"
|
|
connected = client.get("/api/status").json()["channels"][0]
|
|
assert connected["connected_peers"] == 1
|
|
|
|
service.close()
|
|
```
|
|
|
|
- [ ] **Step 5: Run edge case tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_terminal_websocket_channel.py -q
|
|
```
|
|
|
|
Expected: pass. If any test fails, adjust `TerminalWebSocketAdapter` only; do not bypass the runtime path.
|
|
|
|
- [ ] **Step 6: Commit edge case coverage**
|
|
|
|
```bash
|
|
git add app-instance/backend/tests/unit/test_terminal_websocket_channel.py app-instance/backend/beaver/interfaces/channels/terminal_websocket.py
|
|
git commit -m "test: cover terminal websocket edge cases"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: Expose Terminal Status in Frontend Types and Dialog
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/frontend/types/index.ts`
|
|
- Modify: `app-instance/frontend/app/(app)/status/page.tsx`
|
|
|
|
- [ ] **Step 1: Extend `ChannelStatus`**
|
|
|
|
Modify `app-instance/frontend/types/index.ts`:
|
|
|
|
```ts
|
|
export interface ChannelStatus {
|
|
channel_id: string;
|
|
name?: string;
|
|
kind: string;
|
|
mode: string;
|
|
display_name: string;
|
|
enabled: boolean;
|
|
state: 'configured' | 'disabled' | 'starting' | 'running' | 'degraded' | 'error' | 'stopped';
|
|
account_id: string;
|
|
last_error?: string | null;
|
|
last_event_at?: string | null;
|
|
started_at?: string | null;
|
|
capabilities: string[];
|
|
webhook_url?: string | null;
|
|
websocket_url?: string | null;
|
|
connected_peers?: number;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Show WebSocket details in the channel dialog**
|
|
|
|
Modify the selected channel `InfoRow` block in `app-instance/frontend/app/(app)/status/page.tsx`:
|
|
|
|
```tsx
|
|
<InfoRow label="State" value={selectedChannel.state} />
|
|
<InfoRow label="Account" value={selectedChannel.account_id || '-'} />
|
|
<InfoRow label="Webhook" value={selectedChannel.webhook_url || '-'} />
|
|
<InfoRow label="WebSocket" value={selectedChannel.websocket_url || '-'} />
|
|
<InfoRow label="Connected peers" value={String(selectedChannel.connected_peers ?? 0)} />
|
|
<InfoRow label="Last error" value={selectedChannel.last_error || '-'} />
|
|
```
|
|
|
|
- [ ] **Step 3: Show connected peer count in compact channel rows**
|
|
|
|
In the channel row subtitle, replace:
|
|
|
|
```tsx
|
|
{ch.channel_id} · {ch.kind}/{ch.mode} · {ch.account_id}
|
|
```
|
|
|
|
with:
|
|
|
|
```tsx
|
|
{ch.channel_id} · {ch.kind}/{ch.mode} · {ch.account_id}
|
|
{typeof ch.connected_peers === 'number' ? ` · ${ch.connected_peers} peer${ch.connected_peers === 1 ? '' : 's'}` : ''}
|
|
```
|
|
|
|
- [ ] **Step 4: Run frontend verification**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/frontend
|
|
npm run typecheck
|
|
npm test -- --run
|
|
```
|
|
|
|
Expected: both commands pass.
|
|
|
|
- [ ] **Step 5: Commit frontend status updates**
|
|
|
|
```bash
|
|
git add app-instance/frontend/types/index.ts 'app-instance/frontend/app/(app)/status/page.tsx'
|
|
git commit -m "feat: show terminal websocket channel status"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: Final Verification and Manual Smoke Notes
|
|
|
|
**Files:**
|
|
- No code files unless verification exposes a defect.
|
|
|
|
- [ ] **Step 1: Run backend terminal/channel tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_terminal_websocket_channel.py tests/unit/test_channel_runtime.py tests/unit/test_gateway_channels.py tests/unit/test_config_loader.py tests/unit/test_imports.py -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 2: Run full backend unit tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 3: Run frontend verification**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/frontend
|
|
npm run typecheck
|
|
npm test -- --run
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 4: Check nginx syntax if nginx is installed**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
nginx -t -c /home/ivan/xuan/beaver_project/app-instance/nginx.conf
|
|
```
|
|
|
|
Expected when nginx is installed: syntax is ok. If the command is missing, record that nginx is not installed and do not claim nginx syntax was verified.
|
|
|
|
- [ ] **Step 5: Check for generated runtime state**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
find app-instance/backend/state -maxdepth 3 -type f -print 2>/dev/null || true
|
|
```
|
|
|
|
Expected: no test-generated state files remain. Remove only generated test state files if they appear under `app-instance/backend/state`.
|
|
|
|
- [ ] **Step 6: Check diff hygiene**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
git diff --check
|
|
git status --short
|
|
```
|
|
|
|
Expected: `git diff --check` passes. `git status --short` shows only intentional terminal websocket/channel runtime changes.
|
|
|
|
- [ ] **Step 7: Record manual smoke command for terminal colleagues**
|
|
|
|
Use this command after a local server is running:
|
|
|
|
```bash
|
|
websocat ws://127.0.0.1:8080/api/channels/terminal-dev/ws
|
|
```
|
|
|
|
Paste:
|
|
|
|
```json
|
|
{"type":"connect","peer_id":"device-001","device_name":"desk-terminal","capabilities":["text"]}
|
|
```
|
|
|
|
Expected:
|
|
|
|
```json
|
|
{"type":"connected","channel_id":"terminal-dev","session_id":"terminal-dev:local:device-001"}
|
|
```
|
|
|
|
Paste:
|
|
|
|
```json
|
|
{"type":"message","message_id":"device-001-000001","text":"hello"}
|
|
```
|
|
|
|
Expected:
|
|
|
|
```json
|
|
{"type":"ack","message_id":"device-001-000001","session_id":"terminal-dev:local:device-001","accepted":true}
|
|
```
|
|
|
|
Then expect a final assistant message:
|
|
|
|
```json
|
|
{"type":"message","role":"assistant","message_id":"device-001-000001","run_id":"...","text":"...","finish_reason":"stop"}
|
|
```
|
|
|
|
- [ ] **Step 8: Final commit if needed**
|
|
|
|
If Tasks 1-5 were not committed separately, make one scoped commit:
|
|
|
|
```bash
|
|
git add app-instance/backend/beaver/interfaces/channels/terminal_websocket.py \
|
|
app-instance/backend/beaver/interfaces/channels/__init__.py \
|
|
app-instance/backend/beaver/interfaces/channels/runtime.py \
|
|
app-instance/backend/beaver/interfaces/web/app.py \
|
|
app-instance/backend/tests/unit/test_terminal_websocket_channel.py \
|
|
app-instance/frontend/types/index.ts \
|
|
'app-instance/frontend/app/(app)/status/page.tsx' \
|
|
app-instance/nginx.conf
|
|
git commit -m "feat: add terminal websocket channel"
|
|
```
|
|
|
|
Do not commit unrelated user changes.
|
|
|
|
---
|
|
|
|
## Spec Coverage Checklist
|
|
|
|
- Text-only WebSocket endpoint: Tasks 2 and 3.
|
|
- Bus-first routing through `ChannelRuntime.accept_inbound()`: Tasks 1, 2, and 3.
|
|
- Connect/connected/message/ack/assistant frames: Tasks 1 and 2.
|
|
- Ping/pong and error frames: Tasks 1 and 4.
|
|
- Stable session id from `peer_id`: Tasks 1 and 2.
|
|
- Dedupe behavior: Task 4.
|
|
- Disconnect/unclaimed behavior: Task 4.
|
|
- Status fields and events: Tasks 3, 4, and 5.
|
|
- Nginx WebSocket upgrade: Task 3.
|
|
- Verification and manual smoke command: Task 6.
|