2357 lines
75 KiB
Markdown
2357 lines
75 KiB
Markdown
# Channel Runtime V1 Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Build Beaver Channel Runtime v1: a local-configured, bus-first channel runtime with a generic text webhook reference adapter, stable channel identity/session mapping, dedupe, delivery/event logs, `/status` visibility, and a restart-instance control.
|
|
|
|
**Architecture:** Channels are runtime integrations hosted inside the Web backend process. Every inbound message must pass through `MessageBus.inbound`, the gateway bridge, `AgentService.handle_inbound_message()`, `MessageBus.outbound`, `ChannelManager.dispatch_outbound()`, and adapter `send()`. Adapter code never calls `AgentService` directly and never publishes directly to the bus.
|
|
|
|
**Tech Stack:** FastAPI/Python backend, dataclasses, local JSON/JSONL state under workspace, Next.js 13 app router, React, TypeScript, Vitest, pytest.
|
|
|
|
---
|
|
|
|
## Locked Decisions
|
|
|
|
- Channel v1 does not use AuthZ, OAuth, token introspection, permission scopes, pairing, allowlists, or platform signature validation.
|
|
- Channel config lives in local Beaver config under `BeaverConfig.channels`.
|
|
- All channel inbound messages run as chat/direct turns. Channel v1 does not create Tasks and does not support task prefixes.
|
|
- Session ids are generated only by the runtime from `ChannelIdentity`:
|
|
|
|
```text
|
|
<channel_id>:<account_id>:<peer_id>[:<thread_id>]
|
|
```
|
|
|
|
- `channel_id`, `kind`, and `account_id` are separate concepts:
|
|
- `channel_id`: Beaver adapter instance id, for example `webhook-dev`.
|
|
- `kind`: adapter/platform kind, for example `webhook`.
|
|
- `account_id`: stable platform account/workspace/bot/phone identity.
|
|
- `ChannelIdentity.channel_id` is the canonical field. Existing `InboundMessage.channel` and `OutboundMessage.channel` remain for compatibility, but their semantic value is `channel_id`.
|
|
- Adapters do not hold `MessageBus`. They submit inbound messages through `ChannelInboundSink.accept_inbound()`.
|
|
- All enabled channels start automatically with the Web backend. Disabled channels are listed as disabled and are not instantiated.
|
|
- Config changes require backend restart in v1. No hot reload.
|
|
- `/status` gets a compact Channels section with details on demand. No independent channel configuration page in v1.
|
|
- Channel config UI is not included. Config is hand-edited in local config.
|
|
- V1 implements only a generic fixed-schema text webhook reference adapter. No Telegram, Slack, WhatsApp, email, or Outlook channel adapter in v1.
|
|
- Generic webhook waits synchronously for the final reply through the bus-first path. It defaults to a 30 minute response timeout. Timeout returns `202 accepted` and does not cancel the backend run.
|
|
- Generic webhook late outbound after request timeout is recorded as `outbound_unclaimed`.
|
|
- Dedupe result cache is persisted locally, defaults to 48 hours retention, and stores cached replies up to 20000 characters.
|
|
- Delivery failures are recorded but not retried automatically.
|
|
- Cron proactive channel delivery is not included in v1. Existing cron `deliver/channel/to` fields remain reserved.
|
|
- Streaming, typing indicators, tool progress, intermediate messages, attachment handling, multimedia, hot reload, config UI, and real platform adapters are v2 or later.
|
|
- `/status` restores a restart-instance button with confirmation. Restart uses local backend self-restart through `POST /api/runtime/restart`; it is enabled by default and can be disabled with `BEAVER_ENABLE_SELF_RESTART=0`.
|
|
|
|
## File Structure
|
|
|
|
- Modify: `app-instance/backend/beaver/foundation/config/schema.py`
|
|
- Add `ChannelConfig` and `channels: dict[str, ChannelConfig]`.
|
|
- Modify: `app-instance/backend/beaver/foundation/config/loader.py`
|
|
- Parse `channels` from local config.
|
|
- Modify: `app-instance/backend/tests/unit/test_config_loader.py`
|
|
- Cover channel config parsing.
|
|
- Modify: `app-instance/backend/beaver/foundation/events/message_bus.py`
|
|
- Add `ChannelIdentity`, add optional `channel_identity` to inbound/outbound messages, and add content type fields.
|
|
- Modify: `app-instance/backend/beaver/engine/context/builder.py`
|
|
- Extend `SessionContext` with normalized channel identity fields and render them into model-visible context.
|
|
- Modify: `app-instance/backend/beaver/engine/loop.py`
|
|
- Pass channel identity fields into `SessionContext`.
|
|
- Modify: `app-instance/backend/beaver/services/agent_service.py`
|
|
- Preserve `channel_identity` on outbound, pass normalized channel context to direct runs, and keep all channel runs in chat/direct mode.
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/base.py`
|
|
- Replace bus-owning adapter protocol with channel id, kind, mode, lifecycle, and outbound send.
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/manager.py`
|
|
- Dispatch by `channel.channel_id`; adapters no longer expose bus.
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/memory.py`
|
|
- Update test adapter to new identity/session model without old two-segment session ids.
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/state.py`
|
|
- Persistent dedupe store and channel event log.
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/generic_webhook.py`
|
|
- Fixed-schema generic text webhook reference adapter.
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- `ChannelRuntime`, adapter factory, inbound bridge, outbound dispatch lifecycle, status and events API helpers.
|
|
- Modify: `app-instance/backend/beaver/interfaces/gateway/main.py`
|
|
- Turn `run_gateway()` into a compatibility wrapper around `ChannelRuntime`.
|
|
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
|
|
- Start/stop `ChannelRuntime`, add channel routes, expose channel status/events, add runtime restart API, include runtime controls in `/api/status`.
|
|
- Modify: `app-instance/nginx.conf`
|
|
- Add long proxy timeout for channel webhook API path.
|
|
- Modify: `app-instance/create-instance.sh`
|
|
- Set `BEAVER_ENABLE_SELF_RESTART=1` in created containers.
|
|
- Modify: `app-instance/frontend/types/index.ts`
|
|
- Expand channel and runtime control types.
|
|
- Modify: `app-instance/frontend/lib/api.ts`
|
|
- Add channel event and restart API helpers.
|
|
- Modify: `app-instance/frontend/app/(app)/status/page.tsx`
|
|
- Replace hard-coded channel chips with compact runtime channel table, details dialog, and restart confirmation button.
|
|
- Modify: `app-instance/backend/tests/unit/test_gateway_channels.py`
|
|
- Update existing tests to new bus-first runtime semantics and add generic webhook roundtrip coverage.
|
|
- Create: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
- Unit tests for identity/session mapping, dedupe, event log, adapter factory, and restart guard behavior.
|
|
- Modify/Create frontend tests as needed:
|
|
- `app-instance/frontend/lib/api` and status page behavior should be covered through existing Vitest setup or a focused component test if test harness supports it.
|
|
|
|
## Data Contracts
|
|
|
|
### Local Config Shape
|
|
|
|
```json
|
|
{
|
|
"channels": {
|
|
"webhook-dev": {
|
|
"enabled": true,
|
|
"kind": "webhook",
|
|
"mode": "webhook",
|
|
"accountId": "local",
|
|
"displayName": "Webhook Dev",
|
|
"config": {
|
|
"responseTimeoutSeconds": 1800,
|
|
"dedupeRetentionHours": 48,
|
|
"maxCachedReplyChars": 20000,
|
|
"maxCachedErrorChars": 4000
|
|
},
|
|
"secrets": {}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Generic Webhook Request
|
|
|
|
```json
|
|
{
|
|
"peer_id": "demo-user",
|
|
"thread_id": "main",
|
|
"message_id": "msg-001",
|
|
"text": "hello",
|
|
"peer_type": "dm",
|
|
"user_id": "optional-user"
|
|
}
|
|
```
|
|
|
|
`channel_id`, `kind`, and `account_id` are read from config. The request body is not trusted for those fields.
|
|
|
|
### Generic Webhook Response
|
|
|
|
Success:
|
|
|
|
```json
|
|
{
|
|
"ok": true,
|
|
"duplicate": false,
|
|
"pending": false,
|
|
"session_id": "webhook-dev:local:demo-user:main",
|
|
"run_id": "run-1",
|
|
"reply": "assistant response"
|
|
}
|
|
```
|
|
|
|
Processing duplicate:
|
|
|
|
```json
|
|
{
|
|
"ok": true,
|
|
"duplicate": true,
|
|
"pending": true,
|
|
"session_id": "webhook-dev:local:demo-user:main"
|
|
}
|
|
```
|
|
|
|
Completed duplicate:
|
|
|
|
```json
|
|
{
|
|
"ok": true,
|
|
"duplicate": true,
|
|
"pending": false,
|
|
"session_id": "webhook-dev:local:demo-user:main",
|
|
"run_id": "run-1",
|
|
"reply": "assistant response"
|
|
}
|
|
```
|
|
|
|
Timed out waiting for reply:
|
|
|
|
```json
|
|
{
|
|
"ok": true,
|
|
"duplicate": false,
|
|
"pending": true,
|
|
"session_id": "webhook-dev:local:demo-user:main"
|
|
}
|
|
```
|
|
|
|
### Channel Status
|
|
|
|
```json
|
|
{
|
|
"channel_id": "webhook-dev",
|
|
"kind": "webhook",
|
|
"mode": "webhook",
|
|
"display_name": "Webhook Dev",
|
|
"enabled": true,
|
|
"state": "running",
|
|
"account_id": "local",
|
|
"last_error": null,
|
|
"last_event_at": "2026-06-01T10:00:00Z",
|
|
"started_at": "2026-06-01T09:59:00Z",
|
|
"capabilities": ["receive_text", "send_text", "sync_webhook_response"],
|
|
"webhook_url": "/api/channels/webhook-dev/webhook"
|
|
}
|
|
```
|
|
|
|
Allowed states:
|
|
|
|
```text
|
|
configured
|
|
disabled
|
|
starting
|
|
running
|
|
degraded
|
|
error
|
|
stopped
|
|
```
|
|
|
|
### Channel Events
|
|
|
|
Minimum event kinds:
|
|
|
|
```text
|
|
adapter_started
|
|
adapter_stopped
|
|
webhook_received
|
|
inbound_accepted
|
|
inbound_duplicate
|
|
inbound_rejected
|
|
session_id_normalized
|
|
direct_run_started
|
|
direct_run_finished
|
|
direct_run_failed
|
|
outbound_delivered
|
|
outbound_delivery_failed
|
|
outbound_unclaimed
|
|
webhook_response_timeout
|
|
adapter_error
|
|
```
|
|
|
|
Event log entries do not store full user message content or raw webhook payloads.
|
|
|
|
```json
|
|
{
|
|
"event_id": "evt-1",
|
|
"channel_id": "webhook-dev",
|
|
"kind": "inbound_accepted",
|
|
"session_id": "webhook-dev:local:demo-user:main",
|
|
"message_id": "msg-001",
|
|
"run_id": null,
|
|
"status": "ok",
|
|
"error": null,
|
|
"text_preview": "hello",
|
|
"text_length": 5,
|
|
"created_at": "2026-06-01T10:00:00Z"
|
|
}
|
|
```
|
|
|
|
## Task 1: Channel Config Schema
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/foundation/config/schema.py`
|
|
- Modify: `app-instance/backend/beaver/foundation/config/loader.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_config_loader.py`
|
|
|
|
- [ ] **Step 1: Write failing config loader test**
|
|
|
|
Add this test to `app-instance/backend/tests/unit/test_config_loader.py`:
|
|
|
|
```python
|
|
def test_config_loader_reads_channels(tmp_path) -> None:
|
|
config_path = tmp_path / "config.json"
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"agents": {"defaults": {"model": "openai/gpt-5"}},
|
|
"channels": {
|
|
"webhook-dev": {
|
|
"enabled": True,
|
|
"kind": "webhook",
|
|
"mode": "webhook",
|
|
"accountId": "local",
|
|
"displayName": "Webhook Dev",
|
|
"config": {
|
|
"responseTimeoutSeconds": 1800,
|
|
"dedupeRetentionHours": 48,
|
|
},
|
|
"secrets": {"ignored_for_status": "secret-value"},
|
|
}
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
config = load_config(config_path=config_path)
|
|
|
|
channel = config.channels["webhook-dev"]
|
|
assert channel.enabled is True
|
|
assert channel.kind == "webhook"
|
|
assert channel.mode == "webhook"
|
|
assert channel.account_id == "local"
|
|
assert channel.display_name == "Webhook Dev"
|
|
assert channel.config["response_timeout_seconds"] == 1800
|
|
assert channel.config["dedupe_retention_hours"] == 48
|
|
assert channel.secrets == {"ignored_for_status": "secret-value"}
|
|
```
|
|
|
|
- [ ] **Step 2: Run failing test**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_config_loader.py::test_config_loader_reads_channels -q
|
|
```
|
|
|
|
Expected: fail because `BeaverConfig` has no `channels` field.
|
|
|
|
- [ ] **Step 3: Add `ChannelConfig` to config schema**
|
|
|
|
In `app-instance/backend/beaver/foundation/config/schema.py`, add:
|
|
|
|
```python
|
|
@dataclass(slots=True)
|
|
class ChannelConfig:
|
|
"""One configured channel adapter instance."""
|
|
|
|
enabled: bool = False
|
|
kind: str = ""
|
|
mode: str = "webhook"
|
|
account_id: str = ""
|
|
display_name: str = ""
|
|
config: dict[str, Any] = field(default_factory=dict)
|
|
secrets: dict[str, str] = field(default_factory=dict)
|
|
```
|
|
|
|
Then add this field to `BeaverConfig`:
|
|
|
|
```python
|
|
channels: dict[str, ChannelConfig] = field(default_factory=dict)
|
|
```
|
|
|
|
- [ ] **Step 4: Parse channel config**
|
|
|
|
In `app-instance/backend/beaver/foundation/config/loader.py`, add helper functions near the other config parsing helpers:
|
|
|
|
```python
|
|
def _camel_to_snake_key(value: str) -> str:
|
|
result: list[str] = []
|
|
for char in value:
|
|
if char.isupper() and result:
|
|
result.append("_")
|
|
result.append(char.lower())
|
|
return "".join(result)
|
|
|
|
|
|
def _normalize_config_map(value: Any) -> dict[str, Any]:
|
|
if not isinstance(value, dict):
|
|
return {}
|
|
return {
|
|
_camel_to_snake_key(str(key)): item
|
|
for key, item in value.items()
|
|
if str(key).strip()
|
|
}
|
|
|
|
|
|
def _parse_channel_config(payload: Any) -> ChannelConfig:
|
|
if not isinstance(payload, dict):
|
|
return ChannelConfig()
|
|
secrets = payload.get("secrets", {})
|
|
if not isinstance(secrets, dict):
|
|
secrets = {}
|
|
return ChannelConfig(
|
|
enabled=bool(payload.get("enabled", False)),
|
|
kind=_string(payload.get("kind")),
|
|
mode=_string(payload.get("mode")) or "webhook",
|
|
account_id=_string(payload.get("accountId") or payload.get("account_id")),
|
|
display_name=_string(payload.get("displayName") or payload.get("display_name")),
|
|
config=_normalize_config_map(payload.get("config")),
|
|
secrets={str(key): str(value) for key, value in secrets.items() if str(key).strip()},
|
|
)
|
|
```
|
|
|
|
Import `ChannelConfig` from schema, then wire it into `load_config()`:
|
|
|
|
```python
|
|
channels_payload = data.get("channels", {})
|
|
channels = {
|
|
str(channel_id): _parse_channel_config(payload)
|
|
for channel_id, payload in channels_payload.items()
|
|
if isinstance(channels_payload, dict) and str(channel_id).strip()
|
|
}
|
|
```
|
|
|
|
Pass `channels=channels` into `BeaverConfig(...)`.
|
|
|
|
- [ ] **Step 5: Verify config test passes**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_config_loader.py::test_config_loader_reads_channels -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
## Task 2: Channel Identity and Message Models
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/foundation/events/message_bus.py`
|
|
- Modify: `app-instance/backend/beaver/engine/context/builder.py`
|
|
- Modify: `app-instance/backend/beaver/engine/loop.py`
|
|
- Modify: `app-instance/backend/beaver/services/agent_service.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_imports.py`
|
|
- Create or update: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
|
|
- [ ] **Step 1: Write failing identity tests**
|
|
|
|
Create `app-instance/backend/tests/unit/test_channel_runtime.py` with:
|
|
|
|
```python
|
|
from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage
|
|
|
|
|
|
def test_channel_identity_builds_stable_session_id() -> None:
|
|
identity = ChannelIdentity(
|
|
channel_id="webhook-dev",
|
|
kind="webhook",
|
|
account_id="local",
|
|
peer_id="demo-user",
|
|
thread_id="main",
|
|
peer_type="dm",
|
|
message_id="msg-1",
|
|
)
|
|
|
|
assert identity.session_id() == "webhook-dev:local:demo-user:main"
|
|
assert identity.dedupe_key() == "webhook-dev:local:demo-user:main:msg-1"
|
|
|
|
|
|
def test_channel_identity_requires_routing_fields() -> None:
|
|
identity = ChannelIdentity(channel_id="webhook-dev", kind="webhook", account_id="", peer_id="demo")
|
|
|
|
assert identity.validation_error() == "account_id is required"
|
|
|
|
|
|
def test_messages_carry_channel_identity() -> None:
|
|
identity = ChannelIdentity(
|
|
channel_id="webhook-dev",
|
|
kind="webhook",
|
|
account_id="local",
|
|
peer_id="demo-user",
|
|
message_id="msg-1",
|
|
)
|
|
|
|
inbound = InboundMessage(channel="webhook-dev", content="hello", channel_identity=identity)
|
|
outbound = OutboundMessage(
|
|
channel="webhook-dev",
|
|
content="ok",
|
|
session_id=identity.session_id(),
|
|
finish_reason="stop",
|
|
channel_identity=identity,
|
|
)
|
|
|
|
assert inbound.channel_identity is identity
|
|
assert outbound.channel_identity is identity
|
|
```
|
|
|
|
- [ ] **Step 2: Run failing tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_channel_identity_builds_stable_session_id tests/unit/test_channel_runtime.py::test_channel_identity_requires_routing_fields tests/unit/test_channel_runtime.py::test_messages_carry_channel_identity -q
|
|
```
|
|
|
|
Expected: fail because `ChannelIdentity` does not exist.
|
|
|
|
- [ ] **Step 3: Implement identity model**
|
|
|
|
In `app-instance/backend/beaver/foundation/events/message_bus.py`, add:
|
|
|
|
```python
|
|
@dataclass(slots=True)
|
|
class ChannelIdentity:
|
|
"""Normalized channel routing identity.
|
|
|
|
channel_id is the Beaver adapter instance id, not the platform kind.
|
|
"""
|
|
|
|
channel_id: str
|
|
kind: str
|
|
account_id: str
|
|
peer_id: str
|
|
thread_id: str | None = None
|
|
peer_type: str = "unknown"
|
|
user_id: str | None = None
|
|
message_id: str | None = None
|
|
|
|
def validation_error(self) -> str | None:
|
|
if not self.channel_id.strip():
|
|
return "channel_id is required"
|
|
if not self.account_id.strip():
|
|
return "account_id is required"
|
|
if not self.peer_id.strip():
|
|
return "peer_id is required"
|
|
return None
|
|
|
|
def session_id(self) -> str:
|
|
parts = [self.channel_id, self.account_id, self.peer_id]
|
|
if self.thread_id:
|
|
parts.append(self.thread_id)
|
|
return ":".join(_clean_session_part(part) for part in parts)
|
|
|
|
def dedupe_key(self) -> str | None:
|
|
if not self.message_id:
|
|
return None
|
|
return f"{self.session_id()}:{_clean_session_part(self.message_id)}"
|
|
|
|
|
|
def _clean_session_part(value: str) -> str:
|
|
cleaned = str(value).strip()
|
|
if not cleaned:
|
|
return "unknown"
|
|
return cleaned.replace(":", "_")
|
|
```
|
|
|
|
Add fields to `InboundMessage`:
|
|
|
|
```python
|
|
content_type: str = "text"
|
|
channel_identity: ChannelIdentity | None = None
|
|
```
|
|
|
|
Add fields to `OutboundMessage`:
|
|
|
|
```python
|
|
content_type: str = "text"
|
|
channel_identity: ChannelIdentity | None = None
|
|
```
|
|
|
|
Update `app-instance/backend/beaver/foundation/events/__init__.py` to export `ChannelIdentity`.
|
|
|
|
- [ ] **Step 4: Preserve identity in AgentService outbound**
|
|
|
|
In `AgentService.build_outbound_message()`, add:
|
|
|
|
```python
|
|
channel_identity=inbound.channel_identity,
|
|
content_type=inbound.content_type,
|
|
```
|
|
|
|
In `AgentService.build_outbound_error()`, add the same two fields.
|
|
|
|
- [ ] **Step 5: Pass channel context into direct runs**
|
|
|
|
In `AgentService.handle_inbound_message()`, pass normalized channel fields:
|
|
|
|
```python
|
|
channel_identity = inbound.channel_identity
|
|
result = await self.submit_direct(
|
|
inbound.content,
|
|
session_id=inbound.session_id,
|
|
source=f"gateway:{inbound.channel}",
|
|
user_id=inbound.user_id or (channel_identity.user_id if channel_identity else None),
|
|
title=inbound.title,
|
|
execution_context=inbound.execution_context,
|
|
model=inbound.model,
|
|
provider_name=inbound.provider_name,
|
|
embedding_model=inbound.embedding_model,
|
|
channel_identity=channel_identity,
|
|
)
|
|
```
|
|
|
|
In `AgentLoop._process_direct_impl()`, add a keyword parameter:
|
|
|
|
```python
|
|
channel_identity: ChannelIdentity | None = None,
|
|
```
|
|
|
|
Import `ChannelIdentity`. When constructing `SessionContext`, include:
|
|
|
|
```python
|
|
channel=channel_identity.channel_id if channel_identity else None,
|
|
channel_kind=channel_identity.kind if channel_identity else None,
|
|
account_id=channel_identity.account_id if channel_identity else None,
|
|
peer_id=channel_identity.peer_id if channel_identity else None,
|
|
peer_type=channel_identity.peer_type if channel_identity else None,
|
|
chat_id=channel_identity.peer_id if channel_identity else None,
|
|
thread_id=channel_identity.thread_id if channel_identity else None,
|
|
```
|
|
|
|
- [ ] **Step 6: Extend `SessionContext` rendering**
|
|
|
|
In `app-instance/backend/beaver/engine/context/builder.py`, add fields:
|
|
|
|
```python
|
|
channel_kind: str | None = None
|
|
account_id: str | None = None
|
|
peer_id: str | None = None
|
|
peer_type: str | None = None
|
|
thread_id: str | None = None
|
|
```
|
|
|
|
In `_render_session_section()`, add:
|
|
|
|
```python
|
|
if session_context.channel_kind:
|
|
rows.append(f"Channel Kind: {session_context.channel_kind}")
|
|
if session_context.account_id:
|
|
rows.append(f"Account ID: {session_context.account_id}")
|
|
if session_context.peer_id:
|
|
rows.append(f"Peer ID: {session_context.peer_id}")
|
|
if session_context.peer_type:
|
|
rows.append(f"Peer Type: {session_context.peer_type}")
|
|
if session_context.thread_id:
|
|
rows.append(f"Thread ID: {session_context.thread_id}")
|
|
```
|
|
|
|
Do not render `raw_channel_payload`.
|
|
|
|
- [ ] **Step 7: Verify identity tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_imports.py -q
|
|
```
|
|
|
|
Expected: pass after updating import assertions if needed.
|
|
|
|
## Task 3: Dedupe Store and Event Log
|
|
|
|
**Files:**
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/state.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
|
|
- [ ] **Step 1: Add failing state tests**
|
|
|
|
Append to `test_channel_runtime.py`:
|
|
|
|
```python
|
|
from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog
|
|
|
|
|
|
def test_dedupe_store_tracks_processing_and_done(tmp_path) -> None:
|
|
store = ChannelDedupeStore(tmp_path / "dedupe.json", retention_hours=48)
|
|
|
|
created = store.mark_processing(
|
|
dedupe_key="webhook-dev:local:demo:msg-1",
|
|
session_id="webhook-dev:local:demo",
|
|
message_id="msg-1",
|
|
)
|
|
duplicate = store.mark_processing(
|
|
dedupe_key="webhook-dev:local:demo:msg-1",
|
|
session_id="webhook-dev:local:demo",
|
|
message_id="msg-1",
|
|
)
|
|
|
|
assert created.created is True
|
|
assert duplicate.created is False
|
|
assert duplicate.record is not None
|
|
assert duplicate.record["status"] == "processing"
|
|
|
|
store.mark_done(
|
|
dedupe_key="webhook-dev:local:demo:msg-1",
|
|
run_id="run-1",
|
|
reply="hello" * 10000,
|
|
max_reply_chars=20,
|
|
)
|
|
|
|
done = store.get("webhook-dev:local:demo:msg-1")
|
|
assert done is not None
|
|
assert done["status"] == "done"
|
|
assert done["reply"] == "hellohellohellohello"
|
|
|
|
|
|
def test_channel_event_log_writes_recent_events(tmp_path) -> None:
|
|
log = ChannelEventLog(tmp_path / "events.jsonl")
|
|
log.record(
|
|
channel_id="webhook-dev",
|
|
kind="inbound_accepted",
|
|
session_id="webhook-dev:local:demo",
|
|
message_id="msg-1",
|
|
status="ok",
|
|
text="hello world",
|
|
)
|
|
|
|
events = log.recent(channel_id="webhook-dev", limit=10)
|
|
|
|
assert len(events) == 1
|
|
assert events[0]["kind"] == "inbound_accepted"
|
|
assert events[0]["text_preview"] == "hello world"
|
|
assert "raw_channel_payload" not in events[0]
|
|
```
|
|
|
|
- [ ] **Step 2: Run failing state tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_dedupe_store_tracks_processing_and_done tests/unit/test_channel_runtime.py::test_channel_event_log_writes_recent_events -q
|
|
```
|
|
|
|
Expected: fail because `state.py` does not exist.
|
|
|
|
- [ ] **Step 3: Implement `state.py`**
|
|
|
|
Create `app-instance/backend/beaver/interfaces/channels/state.py` with:
|
|
|
|
```python
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
|
|
def _now_ms() -> int:
|
|
return int(time.time() * 1000)
|
|
|
|
|
|
def _iso_now() -> str:
|
|
from datetime import datetime, timezone
|
|
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class DedupeWriteResult:
|
|
created: bool
|
|
record: dict[str, Any] | None = None
|
|
|
|
|
|
class ChannelDedupeStore:
|
|
def __init__(self, path: Path, *, retention_hours: int = 48) -> None:
|
|
self.path = path
|
|
self.retention_ms = max(1, int(retention_hours)) * 60 * 60 * 1000
|
|
self._lock = Lock()
|
|
|
|
def _load(self) -> dict[str, Any]:
|
|
if not self.path.exists():
|
|
return {"records": {}}
|
|
try:
|
|
data = json.loads(self.path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError):
|
|
return {"records": {}}
|
|
return data if isinstance(data, dict) and isinstance(data.get("records"), dict) else {"records": {}}
|
|
|
|
def _save(self, data: dict[str, Any]) -> None:
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
|
|
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
tmp_path.replace(self.path)
|
|
|
|
def _prune_unlocked(self, data: dict[str, Any], now_ms: int) -> None:
|
|
records = data.get("records", {})
|
|
expired_before = now_ms - self.retention_ms
|
|
for key, record in list(records.items()):
|
|
updated_at_ms = int(record.get("updated_at_ms") or record.get("created_at_ms") or 0)
|
|
if updated_at_ms < expired_before:
|
|
records.pop(key, None)
|
|
|
|
def get(self, dedupe_key: str) -> dict[str, Any] | None:
|
|
with self._lock:
|
|
data = self._load()
|
|
self._prune_unlocked(data, _now_ms())
|
|
return data["records"].get(dedupe_key)
|
|
|
|
def mark_processing(self, *, dedupe_key: str, session_id: str, message_id: str) -> DedupeWriteResult:
|
|
with self._lock:
|
|
data = self._load()
|
|
now_ms = _now_ms()
|
|
self._prune_unlocked(data, now_ms)
|
|
existing = data["records"].get(dedupe_key)
|
|
if existing is not None:
|
|
self._save(data)
|
|
return DedupeWriteResult(created=False, record=existing)
|
|
record = {
|
|
"dedupe_key": dedupe_key,
|
|
"status": "processing",
|
|
"session_id": session_id,
|
|
"message_id": message_id,
|
|
"run_id": None,
|
|
"reply": None,
|
|
"error": None,
|
|
"created_at_ms": now_ms,
|
|
"updated_at_ms": now_ms,
|
|
}
|
|
data["records"][dedupe_key] = record
|
|
self._save(data)
|
|
return DedupeWriteResult(created=True, record=record)
|
|
|
|
def mark_done(self, *, dedupe_key: str, run_id: str | None, reply: str, max_reply_chars: int) -> None:
|
|
self._mark_result(
|
|
dedupe_key=dedupe_key,
|
|
status="done",
|
|
run_id=run_id,
|
|
reply=reply[: max(0, int(max_reply_chars))],
|
|
error=None,
|
|
)
|
|
|
|
def mark_error(self, *, dedupe_key: str, error: str, max_error_chars: int) -> None:
|
|
self._mark_result(
|
|
dedupe_key=dedupe_key,
|
|
status="error",
|
|
run_id=None,
|
|
reply=None,
|
|
error=error[: max(0, int(max_error_chars))],
|
|
)
|
|
|
|
def _mark_result(
|
|
self,
|
|
*,
|
|
dedupe_key: str,
|
|
status: str,
|
|
run_id: str | None,
|
|
reply: str | None,
|
|
error: str | None,
|
|
) -> None:
|
|
with self._lock:
|
|
data = self._load()
|
|
record = data["records"].get(dedupe_key)
|
|
if record is None:
|
|
record = {"dedupe_key": dedupe_key, "created_at_ms": _now_ms()}
|
|
data["records"][dedupe_key] = record
|
|
record.update(
|
|
{
|
|
"status": status,
|
|
"run_id": run_id,
|
|
"reply": reply,
|
|
"error": error,
|
|
"updated_at_ms": _now_ms(),
|
|
}
|
|
)
|
|
self._save(data)
|
|
|
|
|
|
class ChannelEventLog:
|
|
def __init__(self, path: Path) -> None:
|
|
self.path = path
|
|
self._lock = Lock()
|
|
|
|
def record(
|
|
self,
|
|
*,
|
|
channel_id: str,
|
|
kind: str,
|
|
session_id: str | None = None,
|
|
message_id: str | None = None,
|
|
run_id: str | None = None,
|
|
status: str = "ok",
|
|
error: str | None = None,
|
|
text: str | None = None,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
entry = {
|
|
"event_id": uuid4().hex,
|
|
"channel_id": channel_id,
|
|
"kind": kind,
|
|
"session_id": session_id,
|
|
"message_id": message_id,
|
|
"run_id": run_id,
|
|
"status": status,
|
|
"error": error,
|
|
"text_preview": (text or "")[:120] if text else None,
|
|
"text_length": len(text) if text else 0,
|
|
"metadata": metadata or {},
|
|
"created_at": _iso_now(),
|
|
}
|
|
with self._lock:
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
with self.path.open("a", encoding="utf-8") as handle:
|
|
handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
|
return entry
|
|
|
|
def recent(self, *, channel_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]:
|
|
if not self.path.exists():
|
|
return []
|
|
lines = self.path.read_text(encoding="utf-8").splitlines()
|
|
items: list[dict[str, Any]] = []
|
|
for line in reversed(lines):
|
|
try:
|
|
item = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if channel_id and item.get("channel_id") != channel_id:
|
|
continue
|
|
items.append(item)
|
|
if len(items) >= max(1, int(limit)):
|
|
break
|
|
return list(reversed(items))
|
|
```
|
|
|
|
- [ ] **Step 4: Verify state tests pass**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_dedupe_store_tracks_processing_and_done tests/unit/test_channel_runtime.py::test_channel_event_log_writes_recent_events -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
## Task 4: Adapter Protocol and ChannelManager Migration
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/base.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/manager.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/memory.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_gateway_channels.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_imports.py`
|
|
|
|
- [ ] **Step 1: Write failing manager tests for new protocol**
|
|
|
|
Update the manager-focused parts of `test_gateway_channels.py` to use `channel_id` instead of `name` and no adapter bus:
|
|
|
|
```python
|
|
def test_channel_manager_dispatches_by_channel_id() -> None:
|
|
class CaptureChannel:
|
|
channel_id = "webhook-dev"
|
|
kind = "webhook"
|
|
mode = "webhook"
|
|
|
|
def __init__(self) -> None:
|
|
self.sent = []
|
|
|
|
async def start(self) -> None:
|
|
pass
|
|
|
|
async def stop(self) -> None:
|
|
pass
|
|
|
|
async def send(self, message: Any) -> None:
|
|
self.sent.append(message)
|
|
|
|
async def run() -> None:
|
|
bus = MessageBus()
|
|
channel = CaptureChannel()
|
|
manager = ChannelManager(bus)
|
|
manager.register(channel)
|
|
await bus.publish_outbound(
|
|
OutboundMessage(
|
|
channel="webhook-dev",
|
|
content="ok",
|
|
session_id="webhook-dev:local:demo",
|
|
finish_reason="stop",
|
|
)
|
|
)
|
|
stop_event = asyncio.Event()
|
|
stop_event.set()
|
|
|
|
await manager.dispatch_outbound(stop_event)
|
|
|
|
assert channel.sent[0].content == "ok"
|
|
|
|
asyncio.run(run())
|
|
```
|
|
|
|
- [ ] **Step 2: Run failing manager test**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_gateway_channels.py::test_channel_manager_dispatches_by_channel_id -q
|
|
```
|
|
|
|
Expected: fail because current protocol uses `name` and bus checks.
|
|
|
|
- [ ] **Step 3: Update adapter protocol**
|
|
|
|
Replace `ChannelAdapter` in `base.py` with:
|
|
|
|
```python
|
|
class ChannelAdapter(Protocol):
|
|
"""Minimal contract every runtime channel adapter must implement."""
|
|
|
|
channel_id: str
|
|
kind: str
|
|
mode: str
|
|
|
|
async def start(self) -> None:
|
|
"""Prepare the channel before messages are routed."""
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop accepting/routing channel messages."""
|
|
|
|
async def send(self, message: OutboundMessage) -> None:
|
|
"""Deliver an outbound message to the concrete channel."""
|
|
|
|
|
|
class ChannelInboundSink(Protocol):
|
|
async def accept_inbound(self, message: InboundMessage) -> Any:
|
|
"""Accept a normalized inbound message from an adapter."""
|
|
```
|
|
|
|
Import `InboundMessage`.
|
|
|
|
- [ ] **Step 4: Update ChannelManager**
|
|
|
|
In `manager.py`:
|
|
|
|
```python
|
|
def register(self, channel: ChannelAdapter) -> None:
|
|
if self.started:
|
|
raise RuntimeError("Cannot register channels after ChannelManager.start()")
|
|
if channel.channel_id in self.channels:
|
|
raise ValueError(f"Channel already registered: {channel.channel_id}")
|
|
self.channels[channel.channel_id] = channel
|
|
```
|
|
|
|
The rest of dispatch remains keyed by `message.channel`.
|
|
|
|
- [ ] **Step 5: Migrate memory adapter to new protocol**
|
|
|
|
Change `MemoryChannelAdapter.__init__` signature:
|
|
|
|
```python
|
|
def __init__(
|
|
self,
|
|
inbound_sink: ChannelInboundSink,
|
|
*,
|
|
channel_id: str = "memory-dev",
|
|
kind: str = "memory",
|
|
mode: str = "webhook",
|
|
account_id: str = "memory",
|
|
) -> None:
|
|
self.channel_id = channel_id
|
|
self.kind = kind
|
|
self.mode = mode
|
|
self.account_id = account_id
|
|
self.inbound_sink = inbound_sink
|
|
self.started = False
|
|
self.sent_messages: list[OutboundMessage] = []
|
|
```
|
|
|
|
Update `publish_external_text()` to build `ChannelIdentity` and call `self.inbound_sink.accept_inbound(message)`.
|
|
|
|
- [ ] **Step 6: Verify existing gateway tests after migration**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_gateway_channels.py tests/unit/test_imports.py -q
|
|
```
|
|
|
|
Expected: some old tests will need assertion updates to new session id format. Update all expected session ids to `<channel_id>:<account_id>:<peer_id>[:<thread_id>]` and remove old `telegram:chat-1` expectations.
|
|
|
|
## Task 5: ChannelRuntime Core
|
|
|
|
**Files:**
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
|
|
- [ ] **Step 1: Add failing runtime tests**
|
|
|
|
Append:
|
|
|
|
```python
|
|
from beaver.foundation.config.schema import ChannelConfig
|
|
from beaver.foundation.events import MessageBus
|
|
from beaver.interfaces.channels.runtime import ChannelRuntime
|
|
|
|
|
|
class FakeAgentService:
|
|
is_running = True
|
|
|
|
async def handle_inbound_message(self, inbound):
|
|
return OutboundMessage(
|
|
message_id=inbound.message_id,
|
|
channel=inbound.channel,
|
|
content=f"echo:{inbound.content}",
|
|
session_id=inbound.session_id,
|
|
finish_reason="stop",
|
|
channel_identity=inbound.channel_identity,
|
|
)
|
|
|
|
|
|
def test_channel_runtime_accept_inbound_normalizes_session_and_dedupes(tmp_path) -> None:
|
|
async def run() -> None:
|
|
bus = MessageBus()
|
|
runtime = ChannelRuntime(
|
|
service=FakeAgentService(),
|
|
bus=bus,
|
|
workspace=tmp_path,
|
|
channels={},
|
|
)
|
|
identity = ChannelIdentity(
|
|
channel_id="webhook-dev",
|
|
kind="webhook",
|
|
account_id="local",
|
|
peer_id="demo",
|
|
message_id="msg-1",
|
|
)
|
|
result = await runtime.accept_inbound(
|
|
InboundMessage(
|
|
channel="webhook-dev",
|
|
content="hello",
|
|
session_id="wrong",
|
|
channel_identity=identity,
|
|
)
|
|
)
|
|
duplicate = await runtime.accept_inbound(
|
|
InboundMessage(
|
|
channel="webhook-dev",
|
|
content="hello",
|
|
channel_identity=identity,
|
|
)
|
|
)
|
|
|
|
queued = await bus.consume_inbound()
|
|
assert result.accepted is True
|
|
assert queued.session_id == "webhook-dev:local:demo"
|
|
assert duplicate.accepted is False
|
|
assert duplicate.duplicate is True
|
|
|
|
asyncio.run(run())
|
|
```
|
|
|
|
- [ ] **Step 2: Run failing runtime test**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_channel_runtime_accept_inbound_normalizes_session_and_dedupes -q
|
|
```
|
|
|
|
Expected: fail because `ChannelRuntime` does not exist.
|
|
|
|
- [ ] **Step 3: Implement runtime result models and constructor**
|
|
|
|
Create `runtime.py` with:
|
|
|
|
```python
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from beaver.foundation.config.schema import ChannelConfig
|
|
from beaver.foundation.events import InboundMessage, MessageBus
|
|
from beaver.interfaces.channels import ChannelAdapter, ChannelManager
|
|
from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog
|
|
from beaver.services.agent_service import AgentService
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ChannelAcceptResult:
|
|
accepted: bool
|
|
duplicate: bool = False
|
|
pending: bool = False
|
|
rejected: bool = False
|
|
session_id: str | None = None
|
|
dedupe_key: str | None = None
|
|
record: dict[str, Any] | None = None
|
|
error: str | None = None
|
|
|
|
|
|
class ChannelRuntime:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
service: AgentService,
|
|
workspace: Path,
|
|
channels: dict[str, ChannelConfig],
|
|
bus: MessageBus | None = None,
|
|
) -> None:
|
|
self.service = service
|
|
self.workspace = workspace
|
|
self.bus = bus or MessageBus()
|
|
self.manager = ChannelManager(self.bus)
|
|
self.channel_configs = dict(channels)
|
|
self.adapters: dict[str, ChannelAdapter] = {}
|
|
self.states: dict[str, dict[str, Any]] = {}
|
|
state_dir = workspace / "state" / "channels"
|
|
self.dedupe = ChannelDedupeStore(state_dir / "dedupe.json")
|
|
self.events = ChannelEventLog(state_dir / "events.jsonl")
|
|
self._bridge_task: asyncio.Task[None] | None = None
|
|
self._dispatch_task: asyncio.Task[None] | None = None
|
|
self._stop_event = asyncio.Event()
|
|
self._dispatch_stop_event = asyncio.Event()
|
|
```
|
|
|
|
- [ ] **Step 4: Implement `accept_inbound()`**
|
|
|
|
Add:
|
|
|
|
```python
|
|
async def accept_inbound(self, message: InboundMessage) -> ChannelAcceptResult:
|
|
identity = message.channel_identity
|
|
if identity is None:
|
|
self.events.record(channel_id=message.channel, kind="inbound_rejected", status="error", error="channel_identity is required")
|
|
return ChannelAcceptResult(accepted=False, rejected=True, error="channel_identity is required")
|
|
validation_error = identity.validation_error()
|
|
if validation_error:
|
|
self.events.record(channel_id=identity.channel_id, kind="inbound_rejected", status="error", error=validation_error)
|
|
return ChannelAcceptResult(accepted=False, rejected=True, error=validation_error)
|
|
|
|
expected_session_id = identity.session_id()
|
|
if message.session_id != expected_session_id:
|
|
self.events.record(
|
|
channel_id=identity.channel_id,
|
|
kind="session_id_normalized",
|
|
session_id=expected_session_id,
|
|
message_id=identity.message_id,
|
|
status="ok",
|
|
)
|
|
message.session_id = expected_session_id
|
|
message.channel = identity.channel_id
|
|
|
|
dedupe_key = identity.dedupe_key()
|
|
if dedupe_key:
|
|
write = self.dedupe.mark_processing(
|
|
dedupe_key=dedupe_key,
|
|
session_id=expected_session_id,
|
|
message_id=identity.message_id or "",
|
|
)
|
|
if not write.created:
|
|
record = write.record or {}
|
|
self.events.record(
|
|
channel_id=identity.channel_id,
|
|
kind="inbound_duplicate",
|
|
session_id=expected_session_id,
|
|
message_id=identity.message_id,
|
|
status=str(record.get("status") or "processing"),
|
|
)
|
|
return ChannelAcceptResult(
|
|
accepted=False,
|
|
duplicate=True,
|
|
pending=record.get("status") == "processing",
|
|
session_id=expected_session_id,
|
|
dedupe_key=dedupe_key,
|
|
record=record,
|
|
)
|
|
|
|
self.events.record(
|
|
channel_id=identity.channel_id,
|
|
kind="inbound_accepted",
|
|
session_id=expected_session_id,
|
|
message_id=identity.message_id,
|
|
status="ok",
|
|
text=message.content,
|
|
)
|
|
await self.bus.publish_inbound(message)
|
|
return ChannelAcceptResult(
|
|
accepted=True,
|
|
session_id=expected_session_id,
|
|
dedupe_key=dedupe_key,
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 5: Implement bridge and dispatch lifecycle**
|
|
|
|
Add:
|
|
|
|
```python
|
|
async def start(self) -> None:
|
|
self._stop_event.clear()
|
|
self._dispatch_stop_event.clear()
|
|
self._bridge_task = asyncio.create_task(self._bridge_inbound_to_agent())
|
|
self._dispatch_task = asyncio.create_task(self.manager.dispatch_outbound(self._dispatch_stop_event))
|
|
|
|
async def stop(self) -> None:
|
|
self._stop_event.set()
|
|
self._dispatch_stop_event.set()
|
|
for task in (self._bridge_task, self._dispatch_task):
|
|
if task is None:
|
|
continue
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
await self.manager.stop()
|
|
|
|
async def _bridge_inbound_to_agent(self) -> None:
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
inbound = await asyncio.wait_for(self.bus.consume_inbound(), timeout=0.25)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
identity = inbound.channel_identity
|
|
try:
|
|
self.events.record(
|
|
channel_id=inbound.channel,
|
|
kind="direct_run_started",
|
|
session_id=inbound.session_id,
|
|
message_id=identity.message_id if identity else inbound.message_id,
|
|
)
|
|
outbound = await self.service.handle_inbound_message(inbound)
|
|
except Exception as exc:
|
|
self.events.record(
|
|
channel_id=inbound.channel,
|
|
kind="direct_run_failed",
|
|
session_id=inbound.session_id,
|
|
message_id=identity.message_id if identity else inbound.message_id,
|
|
status="error",
|
|
error=str(exc),
|
|
)
|
|
outbound = AgentService.build_outbound_error(inbound, detail=str(exc), finish_reason="error")
|
|
else:
|
|
self.events.record(
|
|
channel_id=outbound.channel,
|
|
kind="direct_run_finished",
|
|
session_id=outbound.session_id,
|
|
message_id=identity.message_id if identity else inbound.message_id,
|
|
run_id=outbound.run_id,
|
|
)
|
|
await self.bus.publish_outbound(outbound)
|
|
```
|
|
|
|
- [ ] **Step 6: Verify runtime tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py -q
|
|
```
|
|
|
|
Expected: pass for identity, state, and basic runtime tests.
|
|
|
|
## Task 6: Generic Webhook Reference Adapter
|
|
|
|
**Files:**
|
|
- Create: `app-instance/backend/beaver/interfaces/channels/generic_webhook.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
|
|
- [ ] **Step 1: Add failing generic webhook roundtrip test**
|
|
|
|
Append:
|
|
|
|
```python
|
|
from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter
|
|
|
|
|
|
def test_generic_webhook_adapter_waits_for_outbound_reply(tmp_path) -> None:
|
|
async def run() -> None:
|
|
bus = MessageBus()
|
|
runtime = ChannelRuntime(
|
|
service=FakeAgentService(),
|
|
bus=bus,
|
|
workspace=tmp_path,
|
|
channels={},
|
|
)
|
|
adapter = GenericWebhookAdapter(
|
|
channel_id="webhook-dev",
|
|
kind="webhook",
|
|
mode="webhook",
|
|
account_id="local",
|
|
display_name="Webhook Dev",
|
|
inbound_sink=runtime,
|
|
response_timeout_seconds=1,
|
|
)
|
|
runtime.manager.register(adapter)
|
|
await runtime.start()
|
|
try:
|
|
response = await adapter.handle_webhook_payload(
|
|
{
|
|
"peer_id": "demo",
|
|
"message_id": "msg-1",
|
|
"text": "hello",
|
|
"peer_type": "dm",
|
|
}
|
|
)
|
|
finally:
|
|
await runtime.stop()
|
|
|
|
assert response["ok"] is True
|
|
assert response["reply"] == "echo:hello"
|
|
assert response["session_id"] == "webhook-dev:local:demo"
|
|
|
|
asyncio.run(run())
|
|
```
|
|
|
|
- [ ] **Step 2: Run failing adapter test**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_generic_webhook_adapter_waits_for_outbound_reply -q
|
|
```
|
|
|
|
Expected: fail because generic adapter does not exist.
|
|
|
|
- [ ] **Step 3: Implement generic adapter**
|
|
|
|
Create `generic_webhook.py`:
|
|
|
|
```python
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage
|
|
from beaver.interfaces.channels.base import ChannelInboundSink
|
|
|
|
|
|
class GenericWebhookAdapter:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
channel_id: str,
|
|
kind: str,
|
|
mode: str,
|
|
account_id: str,
|
|
display_name: str = "",
|
|
inbound_sink: ChannelInboundSink,
|
|
response_timeout_seconds: float = 1800,
|
|
) -> None:
|
|
self.channel_id = channel_id
|
|
self.kind = kind
|
|
self.mode = mode
|
|
self.account_id = account_id
|
|
self.display_name = display_name or channel_id
|
|
self.inbound_sink = inbound_sink
|
|
self.response_timeout_seconds = max(1.0, float(response_timeout_seconds))
|
|
self.started = False
|
|
self._pending: dict[str, asyncio.Future[OutboundMessage]] = {}
|
|
|
|
async def start(self) -> None:
|
|
self.started = True
|
|
|
|
async def stop(self) -> None:
|
|
self.started = False
|
|
for future in list(self._pending.values()):
|
|
if not future.done():
|
|
future.cancel()
|
|
self._pending.clear()
|
|
|
|
async def handle_webhook_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
text = str(payload.get("text") or "").strip()
|
|
peer_id = str(payload.get("peer_id") or "").strip()
|
|
message_id = str(payload.get("message_id") or "").strip()
|
|
thread_id = str(payload.get("thread_id") or "").strip() or None
|
|
peer_type = str(payload.get("peer_type") or "unknown").strip() or "unknown"
|
|
user_id = str(payload.get("user_id") or "").strip() or None
|
|
if not text:
|
|
return {"ok": False, "error": "text is required"}
|
|
if not peer_id:
|
|
return {"ok": False, "error": "peer_id is required"}
|
|
if not message_id:
|
|
return {"ok": False, "error": "message_id is required"}
|
|
|
|
identity = ChannelIdentity(
|
|
channel_id=self.channel_id,
|
|
kind=self.kind,
|
|
account_id=self.account_id,
|
|
peer_id=peer_id,
|
|
thread_id=thread_id,
|
|
peer_type=peer_type,
|
|
user_id=user_id,
|
|
message_id=message_id,
|
|
)
|
|
inbound = InboundMessage(
|
|
channel=self.channel_id,
|
|
content=text,
|
|
user_id=user_id,
|
|
channel_identity=identity,
|
|
metadata={"webhook": {"peer_type": peer_type}},
|
|
)
|
|
future = asyncio.get_running_loop().create_future()
|
|
self._pending[inbound.message_id] = future
|
|
accept = await self.inbound_sink.accept_inbound(inbound)
|
|
if not accept.accepted:
|
|
self._pending.pop(inbound.message_id, None)
|
|
record = accept.record or {}
|
|
return {
|
|
"ok": accept.error is None,
|
|
"duplicate": accept.duplicate,
|
|
"pending": accept.pending,
|
|
"session_id": accept.session_id,
|
|
"status": record.get("status"),
|
|
"run_id": record.get("run_id"),
|
|
"reply": record.get("reply"),
|
|
"error": accept.error or record.get("error"),
|
|
}
|
|
try:
|
|
outbound = await asyncio.wait_for(future, timeout=self.response_timeout_seconds)
|
|
except asyncio.TimeoutError:
|
|
self._pending.pop(inbound.message_id, None)
|
|
return {
|
|
"ok": True,
|
|
"duplicate": False,
|
|
"pending": True,
|
|
"session_id": accept.session_id,
|
|
}
|
|
return {
|
|
"ok": outbound.finish_reason != "error",
|
|
"duplicate": False,
|
|
"pending": False,
|
|
"session_id": outbound.session_id,
|
|
"run_id": outbound.run_id,
|
|
"reply": outbound.content,
|
|
"error": outbound.metadata.get("error"),
|
|
}
|
|
|
|
async def send(self, message: OutboundMessage) -> None:
|
|
future = self._pending.pop(message.message_id, None)
|
|
if future is None or future.done():
|
|
return
|
|
future.set_result(message)
|
|
```
|
|
|
|
- [ ] **Step 4: Mark dedupe done/error from runtime dispatch**
|
|
|
|
In `ChannelRuntime._bridge_inbound_to_agent()`, after outbound is created, if inbound has `dedupe_key`, mark done/error:
|
|
|
|
```python
|
|
dedupe_key = identity.dedupe_key() if identity else None
|
|
if dedupe_key:
|
|
if outbound.finish_reason == "error":
|
|
self.dedupe.mark_error(
|
|
dedupe_key=dedupe_key,
|
|
error=outbound.content,
|
|
max_error_chars=4000,
|
|
)
|
|
else:
|
|
self.dedupe.mark_done(
|
|
dedupe_key=dedupe_key,
|
|
run_id=outbound.run_id,
|
|
reply=outbound.content,
|
|
max_reply_chars=20000,
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 5: Record delivery events in ChannelManager or runtime wrapper**
|
|
|
|
Wrap adapter `send()` in `ChannelManager.dispatch_outbound()` or introduce a runtime-managed dispatch callback. V1 minimal implementation can keep the manager catch and append `undeliverable`, but `ChannelRuntime` must expose delivery events. If manager remains event-unaware, add optional callback:
|
|
|
|
```python
|
|
async def dispatch_outbound(
|
|
self,
|
|
stop_event: asyncio.Event,
|
|
on_delivered: Callable[[OutboundMessage], Awaitable[None]] | None = None,
|
|
on_failed: Callable[[OutboundMessage, Exception | None], Awaitable[None]] | None = None,
|
|
) -> None:
|
|
```
|
|
|
|
Use callbacks in `ChannelRuntime.start()` to record `outbound_delivered` and `outbound_delivery_failed`.
|
|
|
|
- [ ] **Step 6: Verify generic webhook roundtrip**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_generic_webhook_adapter_waits_for_outbound_reply -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
## Task 7: Runtime Adapter Factory and Status
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
|
|
- [ ] **Step 1: Add failing status/factory test**
|
|
|
|
Append:
|
|
|
|
```python
|
|
def test_channel_runtime_starts_enabled_generic_webhook_and_reports_status(tmp_path) -> None:
|
|
async def run() -> None:
|
|
runtime = ChannelRuntime(
|
|
service=FakeAgentService(),
|
|
workspace=tmp_path,
|
|
channels={
|
|
"webhook-dev": ChannelConfig(
|
|
enabled=True,
|
|
kind="webhook",
|
|
mode="webhook",
|
|
account_id="local",
|
|
display_name="Webhook Dev",
|
|
config={"response_timeout_seconds": 1800},
|
|
),
|
|
"off": ChannelConfig(
|
|
enabled=False,
|
|
kind="webhook",
|
|
mode="webhook",
|
|
account_id="local",
|
|
),
|
|
},
|
|
)
|
|
await runtime.start()
|
|
try:
|
|
statuses = runtime.statuses()
|
|
finally:
|
|
await runtime.stop()
|
|
|
|
by_id = {item["channel_id"]: item for item in statuses}
|
|
assert by_id["webhook-dev"]["state"] == "running"
|
|
assert by_id["webhook-dev"]["webhook_url"] == "/api/channels/webhook-dev/webhook"
|
|
assert by_id["off"]["state"] == "disabled"
|
|
|
|
asyncio.run(run())
|
|
```
|
|
|
|
- [ ] **Step 2: Implement adapter creation**
|
|
|
|
In `ChannelRuntime.start()`, before starting bridge/dispatch tasks:
|
|
|
|
```python
|
|
for channel_id, cfg in self.channel_configs.items():
|
|
if not cfg.enabled:
|
|
self.states[channel_id] = {"state": "disabled", "last_error": None}
|
|
continue
|
|
try:
|
|
adapter = self._build_adapter(channel_id, cfg)
|
|
self.adapters[channel_id] = adapter
|
|
self.manager.register(adapter)
|
|
await adapter.start()
|
|
self.states[channel_id] = {"state": "running", "last_error": None, "started_at": _iso_now()}
|
|
self.events.record(channel_id=channel_id, kind="adapter_started")
|
|
except Exception as exc:
|
|
self.states[channel_id] = {"state": "error", "last_error": str(exc)}
|
|
self.events.record(channel_id=channel_id, kind="adapter_error", status="error", error=str(exc))
|
|
```
|
|
|
|
Add `_build_adapter()`:
|
|
|
|
```python
|
|
def _build_adapter(self, channel_id: str, cfg: ChannelConfig) -> ChannelAdapter:
|
|
if cfg.kind != "webhook" or cfg.mode != "webhook":
|
|
raise ValueError(f"Unsupported channel kind/mode: {cfg.kind}/{cfg.mode}")
|
|
from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter
|
|
|
|
return GenericWebhookAdapter(
|
|
channel_id=channel_id,
|
|
kind=cfg.kind,
|
|
mode=cfg.mode,
|
|
account_id=cfg.account_id,
|
|
display_name=cfg.display_name,
|
|
inbound_sink=self,
|
|
response_timeout_seconds=float(cfg.config.get("response_timeout_seconds") or 1800),
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 3: Implement status helpers**
|
|
|
|
Add:
|
|
|
|
```python
|
|
def statuses(self) -> list[dict[str, Any]]:
|
|
items: list[dict[str, Any]] = []
|
|
recent = self.events.recent(limit=500)
|
|
last_by_channel = {event["channel_id"]: event for event in recent if event.get("channel_id")}
|
|
for channel_id, cfg in self.channel_configs.items():
|
|
state = self.states.get(channel_id, {"state": "configured", "last_error": None})
|
|
items.append(
|
|
{
|
|
"channel_id": channel_id,
|
|
"name": channel_id,
|
|
"kind": cfg.kind,
|
|
"mode": cfg.mode,
|
|
"display_name": cfg.display_name or channel_id,
|
|
"enabled": cfg.enabled,
|
|
"state": state.get("state", "configured"),
|
|
"account_id": cfg.account_id,
|
|
"last_error": state.get("last_error"),
|
|
"started_at": state.get("started_at"),
|
|
"last_event_at": last_by_channel.get(channel_id, {}).get("created_at"),
|
|
"capabilities": ["receive_text", "send_text", "sync_webhook_response"]
|
|
if cfg.kind == "webhook"
|
|
else [],
|
|
"webhook_url": f"/api/channels/{channel_id}/webhook" if cfg.kind == "webhook" else None,
|
|
}
|
|
)
|
|
return items
|
|
|
|
def recent_events(self, channel_id: str, *, limit: int = 100) -> list[dict[str, Any]]:
|
|
return self.events.recent(channel_id=channel_id, limit=limit)
|
|
```
|
|
|
|
- [ ] **Step 4: Verify status/factory tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_channel_runtime_starts_enabled_generic_webhook_and_reports_status -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
## Task 8: Web Backend Integration and Routes
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
|
|
- Modify: `app-instance/backend/beaver/interfaces/gateway/main.py`
|
|
- Modify: `app-instance/backend/tests/unit/test_gateway_channels.py`
|
|
|
|
- [ ] **Step 1: Add failing Web API route test**
|
|
|
|
Add to `test_gateway_channels.py` or a focused web app test file:
|
|
|
|
```python
|
|
def test_web_app_exposes_channel_status_and_webhook(tmp_path) -> None:
|
|
from fastapi.testclient import TestClient
|
|
from beaver.foundation.config.loader import load_config
|
|
from beaver.interfaces.web.app import create_app
|
|
|
|
config_path = tmp_path / "config.json"
|
|
workspace = tmp_path / "workspace"
|
|
workspace.mkdir()
|
|
config_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"agents": {"defaults": {"model": "openai/gpt-5"}},
|
|
"providers": {},
|
|
"channels": {
|
|
"webhook-dev": {
|
|
"enabled": True,
|
|
"kind": "webhook",
|
|
"mode": "webhook",
|
|
"accountId": "local",
|
|
}
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
app = create_app(workspace=workspace, config_path=config_path)
|
|
with TestClient(app) as client:
|
|
status = client.get("/api/status").json()
|
|
assert status["channels"][0]["channel_id"] == "webhook-dev"
|
|
assert status["channels"][0]["state"] == "running"
|
|
```
|
|
|
|
Use fake provider/service injection if the default app startup requires external provider calls. The test should not call a real LLM.
|
|
|
|
- [ ] **Step 2: Start ChannelRuntime in lifespan**
|
|
|
|
In `_app_lifespan`, after `AgentService.start()` and config boot, create and start runtime:
|
|
|
|
```python
|
|
loaded = attached_service.create_loop().boot()
|
|
channel_runtime = ChannelRuntime(
|
|
service=attached_service,
|
|
workspace=loaded.workspace,
|
|
channels=loaded.config.channels,
|
|
)
|
|
fastapi_app.state.channel_runtime = channel_runtime
|
|
await channel_runtime.start()
|
|
```
|
|
|
|
On shutdown, stop it before or alongside cron shutdown:
|
|
|
|
```python
|
|
runtime = getattr(fastapi_app.state, "channel_runtime", None)
|
|
if runtime is not None:
|
|
await runtime.stop()
|
|
```
|
|
|
|
- [ ] **Step 3: Add route helpers**
|
|
|
|
Add:
|
|
|
|
```python
|
|
def get_channel_runtime(request: Request) -> ChannelRuntime:
|
|
runtime = getattr(request.app.state, "channel_runtime", None)
|
|
if runtime is None:
|
|
raise HTTPException(status_code=503, detail="Channel runtime is not running")
|
|
return runtime
|
|
```
|
|
|
|
Routes:
|
|
|
|
```python
|
|
@app.get("/api/channels")
|
|
async def list_channels(request: Request) -> list[dict[str, Any]]:
|
|
return get_channel_runtime(request).statuses()
|
|
|
|
|
|
@app.get("/api/channels/{channel_id}/events")
|
|
async def list_channel_events(channel_id: str, request: Request, limit: int = 100) -> list[dict[str, Any]]:
|
|
return get_channel_runtime(request).recent_events(channel_id, limit=limit)
|
|
|
|
|
|
@app.post("/api/channels/{channel_id}/webhook")
|
|
async def post_channel_webhook(channel_id: str, request: Request) -> JSONResponse:
|
|
runtime = get_channel_runtime(request)
|
|
adapter = runtime.adapters.get(channel_id)
|
|
if adapter is None or not hasattr(adapter, "handle_webhook_payload"):
|
|
raise HTTPException(status_code=404, detail="Webhook channel not found")
|
|
payload = await request.json()
|
|
if not isinstance(payload, dict):
|
|
raise HTTPException(status_code=400, detail="Webhook payload must be a JSON object")
|
|
result = await adapter.handle_webhook_payload(payload)
|
|
status_code = 202 if result.get("pending") else 200
|
|
return JSONResponse(result, status_code=status_code)
|
|
```
|
|
|
|
Also support `GET /api/channels/{channel_id}/webhook` only if needed later for platform challenge. Generic v1 can omit GET.
|
|
|
|
- [ ] **Step 4: Include runtime controls and channel status in `/api/status`**
|
|
|
|
Replace hard-coded:
|
|
|
|
```python
|
|
"channels": [{"name": "web", "enabled": True}],
|
|
```
|
|
|
|
with:
|
|
|
|
```python
|
|
runtime = getattr(request.app.state, "channel_runtime", None)
|
|
"channels": runtime.statuses() if runtime is not None else [],
|
|
"runtime_controls": {
|
|
"self_restart": _self_restart_enabled(),
|
|
},
|
|
```
|
|
|
|
- [ ] **Step 5: Make `run_gateway()` a compatibility wrapper**
|
|
|
|
In `gateway/main.py`, keep public signature if practical, but route through `ChannelRuntime` for lifecycle. Existing tests that inject `channels` can build a runtime with those adapters registered or can be rewritten to target `ChannelRuntime` directly.
|
|
|
|
- [ ] **Step 6: Verify backend route tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_gateway_channels.py tests/unit/test_channel_runtime.py -q
|
|
```
|
|
|
|
Expected: pass without real network or real LLM calls.
|
|
|
|
## Task 9: Self-Restart API
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/backend/beaver/interfaces/web/app.py`
|
|
- Modify: `app-instance/create-instance.sh`
|
|
- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py`
|
|
|
|
- [ ] **Step 1: Add restart status and disabled behavior tests**
|
|
|
|
Add:
|
|
|
|
```python
|
|
def test_self_restart_env_defaults_enabled(monkeypatch) -> None:
|
|
from beaver.interfaces.web.app import _self_restart_enabled
|
|
|
|
monkeypatch.delenv("BEAVER_ENABLE_SELF_RESTART", raising=False)
|
|
|
|
assert _self_restart_enabled() is True
|
|
|
|
|
|
def test_self_restart_env_can_disable(monkeypatch) -> None:
|
|
from beaver.interfaces.web.app import _self_restart_enabled
|
|
|
|
monkeypatch.setenv("BEAVER_ENABLE_SELF_RESTART", "0")
|
|
|
|
assert _self_restart_enabled() is False
|
|
```
|
|
|
|
- [ ] **Step 2: Implement restart helpers**
|
|
|
|
In `web/app.py`:
|
|
|
|
```python
|
|
def _self_restart_enabled() -> bool:
|
|
return os.getenv("BEAVER_ENABLE_SELF_RESTART", "1").strip() not in {"0", "false", "False"}
|
|
|
|
|
|
def _schedule_self_restart(delay_seconds: float = 0.75) -> None:
|
|
import os as _os
|
|
import threading
|
|
import time
|
|
|
|
def _exit_later() -> None:
|
|
time.sleep(delay_seconds)
|
|
_os._exit(0)
|
|
|
|
threading.Thread(target=_exit_later, daemon=True).start()
|
|
```
|
|
|
|
Add route:
|
|
|
|
```python
|
|
@app.post("/api/runtime/restart")
|
|
async def restart_runtime() -> JSONResponse:
|
|
if not _self_restart_enabled():
|
|
raise HTTPException(status_code=403, detail="Self restart is disabled")
|
|
_schedule_self_restart()
|
|
return JSONResponse({"ok": True, "restarting": True}, status_code=202)
|
|
```
|
|
|
|
- [ ] **Step 3: Ensure created containers keep default restart enabled**
|
|
|
|
In `app-instance/create-instance.sh`, add to `RUN_ARGS`:
|
|
|
|
```bash
|
|
-e "BEAVER_ENABLE_SELF_RESTART=1"
|
|
```
|
|
|
|
- [ ] **Step 4: Verify restart helper tests**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py::test_self_restart_env_defaults_enabled tests/unit/test_channel_runtime.py::test_self_restart_env_can_disable -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
## Task 10: Nginx Timeout for Channel Webhooks
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/nginx.conf`
|
|
|
|
- [ ] **Step 1: Add channel webhook location before generic `/api/`**
|
|
|
|
In `app-instance/nginx.conf`, before `location /api/`, add:
|
|
|
|
```nginx
|
|
location /api/channels/ {
|
|
proxy_pass http://127.0.0.1:18080;
|
|
proxy_read_timeout 1800;
|
|
proxy_send_timeout 1800;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Check nginx config syntax if nginx is available**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
nginx -t -c /home/ivan/xuan/beaver_project/app-instance/nginx.conf
|
|
```
|
|
|
|
Expected: syntax ok. If nginx is not installed locally, record that this check was skipped and rely on container verification later.
|
|
|
|
## Task 11: Frontend API Types and Helpers
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/frontend/types/index.ts`
|
|
- Modify: `app-instance/frontend/lib/api.ts`
|
|
|
|
- [ ] **Step 1: Update types**
|
|
|
|
Replace `ChannelStatus` with:
|
|
|
|
```ts
|
|
export interface ChannelStatus {
|
|
channel_id: string;
|
|
name?: string;
|
|
kind: string;
|
|
mode: string;
|
|
display_name: string;
|
|
enabled: boolean;
|
|
state: 'configured' | 'disabled' | 'starting' | 'running' | 'degraded' | 'error' | 'stopped';
|
|
account_id: string;
|
|
last_error?: string | null;
|
|
last_event_at?: string | null;
|
|
started_at?: string | null;
|
|
capabilities: string[];
|
|
webhook_url?: string | null;
|
|
}
|
|
|
|
export interface ChannelEventRecord {
|
|
event_id: string;
|
|
channel_id: string;
|
|
kind: string;
|
|
session_id?: string | null;
|
|
message_id?: string | null;
|
|
run_id?: string | null;
|
|
status: string;
|
|
error?: string | null;
|
|
text_preview?: string | null;
|
|
text_length?: number;
|
|
created_at: string;
|
|
metadata?: Record<string, unknown>;
|
|
}
|
|
|
|
export interface RuntimeControls {
|
|
self_restart: boolean;
|
|
}
|
|
```
|
|
|
|
Add to `SystemStatus`:
|
|
|
|
```ts
|
|
runtime_controls?: RuntimeControls;
|
|
```
|
|
|
|
- [ ] **Step 2: Add API helpers**
|
|
|
|
In `app-instance/frontend/lib/api.ts`:
|
|
|
|
```ts
|
|
export async function listChannelEvents(channelId: string, limit: number = 100): Promise<ChannelEventRecord[]> {
|
|
return fetchJSON(`/api/channels/${encodeURIComponent(channelId)}/events?limit=${limit}`);
|
|
}
|
|
|
|
export async function restartRuntime(): Promise<{ ok: boolean; restarting: boolean }> {
|
|
return fetchJSON('/api/runtime/restart', {
|
|
method: 'POST',
|
|
timeoutMs: 5000,
|
|
});
|
|
}
|
|
```
|
|
|
|
Import `ChannelEventRecord`.
|
|
|
|
- [ ] **Step 3: Typecheck frontend**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/frontend
|
|
npm run typecheck
|
|
```
|
|
|
|
Expected: pass. If this repo does not define `typecheck`, run `npx tsc --noEmit`.
|
|
|
|
## Task 12: Status Page Channels and Restart UI
|
|
|
|
**Files:**
|
|
- Modify: `app-instance/frontend/app/(app)/status/page.tsx`
|
|
|
|
- [ ] **Step 1: Add UI state**
|
|
|
|
Add imports:
|
|
|
|
```ts
|
|
import { listChannelEvents, restartRuntime } from '@/lib/api';
|
|
import type { ChannelEventRecord, ChannelStatus, ProviderStatus, SystemStatus } from '@/types';
|
|
```
|
|
|
|
Add state:
|
|
|
|
```ts
|
|
const [selectedChannel, setSelectedChannel] = useState<ChannelStatus | null>(null);
|
|
const [channelEvents, setChannelEvents] = useState<ChannelEventRecord[]>([]);
|
|
const [loadingChannelEvents, setLoadingChannelEvents] = useState(false);
|
|
const [restartOpen, setRestartOpen] = useState(false);
|
|
const [restarting, setRestarting] = useState(false);
|
|
const [restartError, setRestartError] = useState<string | null>(null);
|
|
```
|
|
|
|
- [ ] **Step 2: Add handlers**
|
|
|
|
```ts
|
|
const openChannelDetails = async (channel: ChannelStatus) => {
|
|
setSelectedChannel(channel);
|
|
setChannelEvents([]);
|
|
setLoadingChannelEvents(true);
|
|
try {
|
|
setChannelEvents(await listChannelEvents(channel.channel_id, 20));
|
|
} catch {
|
|
setChannelEvents([]);
|
|
} finally {
|
|
setLoadingChannelEvents(false);
|
|
}
|
|
};
|
|
|
|
const handleRestart = async () => {
|
|
setRestarting(true);
|
|
setRestartError(null);
|
|
try {
|
|
await restartRuntime();
|
|
setRestartOpen(false);
|
|
window.setTimeout(() => {
|
|
void loadStatus();
|
|
}, 5000);
|
|
} catch (err: any) {
|
|
setRestartError(err.message || pickAppText(locale, '重启失败', 'Restart failed'));
|
|
} finally {
|
|
setRestarting(false);
|
|
}
|
|
};
|
|
```
|
|
|
|
- [ ] **Step 3: Add restart button to Instance runtime card**
|
|
|
|
Next to Runtime Logs:
|
|
|
|
```tsx
|
|
{status.runtime_controls?.self_restart !== false ? (
|
|
<Button variant="outline" onClick={() => setRestartOpen(true)}>
|
|
<RefreshCw className="w-4 h-4 mr-2" />
|
|
{pickAppText(locale, '重启实例', 'Restart instance')}
|
|
</Button>
|
|
) : null}
|
|
```
|
|
|
|
- [ ] **Step 4: Replace Channels card content**
|
|
|
|
Replace the current chip grid with compact rows:
|
|
|
|
```tsx
|
|
<div className="space-y-2">
|
|
{status.channels.length === 0 ? (
|
|
<p className="text-sm text-muted-foreground">
|
|
{pickAppText(locale, '尚未配置通道', 'No channels configured')}
|
|
</p>
|
|
) : (
|
|
status.channels.map((ch) => (
|
|
<button
|
|
key={ch.channel_id}
|
|
type="button"
|
|
onClick={() => openChannelDetails(ch)}
|
|
className="flex w-full items-center justify-between rounded-lg border px-3 py-2 text-left text-sm hover:bg-muted/40"
|
|
>
|
|
<span className="min-w-0">
|
|
<span className="block truncate font-medium">{ch.display_name || ch.channel_id}</span>
|
|
<span className="block truncate text-xs text-muted-foreground">
|
|
{ch.channel_id} · {ch.kind}/{ch.mode} · {ch.account_id}
|
|
</span>
|
|
</span>
|
|
<Badge variant={ch.state === 'running' ? 'default' : ch.state === 'error' ? 'destructive' : 'secondary'}>
|
|
{ch.state}
|
|
</Badge>
|
|
</button>
|
|
))
|
|
)}
|
|
</div>
|
|
```
|
|
|
|
- [ ] **Step 5: Add channel details dialog**
|
|
|
|
Add a dialog after the provider dialog:
|
|
|
|
```tsx
|
|
<Dialog open={Boolean(selectedChannel)} onOpenChange={(open) => !open && setSelectedChannel(null)}>
|
|
<DialogContent className="sm:max-w-[720px]">
|
|
<DialogHeader>
|
|
<DialogTitle>{selectedChannel?.display_name || selectedChannel?.channel_id}</DialogTitle>
|
|
<DialogDescription>
|
|
{selectedChannel ? `${selectedChannel.kind}/${selectedChannel.mode} · ${selectedChannel.channel_id}` : ''}
|
|
</DialogDescription>
|
|
</DialogHeader>
|
|
{selectedChannel ? (
|
|
<div className="space-y-4">
|
|
<div className="grid gap-2 text-sm sm:grid-cols-2">
|
|
<InfoRow label="State" value={selectedChannel.state} />
|
|
<InfoRow label="Account" value={selectedChannel.account_id || '-'} />
|
|
<InfoRow label="Webhook" value={selectedChannel.webhook_url || '-'} />
|
|
<InfoRow label="Last error" value={selectedChannel.last_error || '-'} />
|
|
</div>
|
|
<div className="space-y-2">
|
|
<p className="text-sm font-medium">{pickAppText(locale, '最近事件', 'Recent events')}</p>
|
|
{loadingChannelEvents ? (
|
|
<Loader2 className="h-4 w-4 animate-spin text-muted-foreground" />
|
|
) : (
|
|
<div className="max-h-[320px] overflow-auto rounded-md border">
|
|
{channelEvents.map((event) => (
|
|
<div key={event.event_id} className="border-b px-3 py-2 text-xs last:border-b-0">
|
|
<div className="flex items-center justify-between gap-2">
|
|
<span className="font-medium">{event.kind}</span>
|
|
<span className="text-muted-foreground">{event.created_at}</span>
|
|
</div>
|
|
<div className="mt-1 text-muted-foreground">
|
|
{event.status}{event.error ? ` · ${event.error}` : ''}
|
|
</div>
|
|
</div>
|
|
))}
|
|
{channelEvents.length === 0 ? (
|
|
<p className="px-3 py-6 text-sm text-muted-foreground">
|
|
{pickAppText(locale, '暂无事件', 'No events yet')}
|
|
</p>
|
|
) : null}
|
|
</div>
|
|
)}
|
|
</div>
|
|
</div>
|
|
) : null}
|
|
</DialogContent>
|
|
</Dialog>
|
|
```
|
|
|
|
- [ ] **Step 6: Add restart confirmation dialog**
|
|
|
|
```tsx
|
|
<Dialog open={restartOpen} onOpenChange={setRestartOpen}>
|
|
<DialogContent className="sm:max-w-[420px]">
|
|
<DialogHeader>
|
|
<DialogTitle>{pickAppText(locale, '重启实例?', 'Restart instance?')}</DialogTitle>
|
|
<DialogDescription>
|
|
{pickAppText(
|
|
locale,
|
|
'应用会短暂不可用,正在运行的对话和通道请求可能会中断。',
|
|
'The app will be unavailable briefly. Running chats and channel requests may be interrupted.'
|
|
)}
|
|
</DialogDescription>
|
|
</DialogHeader>
|
|
{restartError ? <p className="text-sm text-destructive">{restartError}</p> : null}
|
|
<DialogFooter>
|
|
<Button variant="outline" onClick={() => setRestartOpen(false)} disabled={restarting}>
|
|
{pickAppText(locale, '取消', 'Cancel')}
|
|
</Button>
|
|
<Button onClick={handleRestart} disabled={restarting}>
|
|
{restarting ? <Loader2 className="mr-2 h-4 w-4 animate-spin" /> : null}
|
|
{pickAppText(locale, '重启', 'Restart')}
|
|
</Button>
|
|
</DialogFooter>
|
|
</DialogContent>
|
|
</Dialog>
|
|
```
|
|
|
|
- [ ] **Step 7: Verify frontend**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/frontend
|
|
npm test -- --run
|
|
npx tsc --noEmit
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
## Task 13: End-to-End Backend Verification
|
|
|
|
**Files:**
|
|
- Modify: backend tests as needed.
|
|
|
|
- [ ] **Step 1: Run backend unit tests for channel area**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_gateway_channels.py tests/unit/test_config_loader.py tests/unit/test_imports.py -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 2: Run broader backend tests impacted by AgentService/session context**
|
|
|
|
Run:
|
|
|
|
```bash
|
|
cd app-instance/backend
|
|
uv run pytest tests/unit/test_agent_loop.py tests/unit/test_initial_skill_tool_hints.py tests/unit/test_marketplace_and_mcp.py -q
|
|
```
|
|
|
|
Expected: pass.
|
|
|
|
- [ ] **Step 3: Manually verify generic webhook in local app**
|
|
|
|
Start backend/frontend using the existing app-instance workflow. With config containing:
|
|
|
|
```json
|
|
{
|
|
"channels": {
|
|
"webhook-dev": {
|
|
"enabled": true,
|
|
"kind": "webhook",
|
|
"mode": "webhook",
|
|
"accountId": "local",
|
|
"displayName": "Webhook Dev",
|
|
"config": {
|
|
"responseTimeoutSeconds": 1800
|
|
},
|
|
"secrets": {}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
Send:
|
|
|
|
```bash
|
|
curl -sS -X POST http://127.0.0.1:18080/api/channels/webhook-dev/webhook \
|
|
-H 'Content-Type: application/json' \
|
|
-d '{"peer_id":"demo-user","thread_id":"main","message_id":"msg-001","text":"hello","peer_type":"dm"}'
|
|
```
|
|
|
|
Expected shape:
|
|
|
|
```json
|
|
{
|
|
"ok": true,
|
|
"duplicate": false,
|
|
"pending": false,
|
|
"session_id": "webhook-dev:local:demo-user:main",
|
|
"reply": "..."
|
|
}
|
|
```
|
|
|
|
Repeat the same curl. Expected `duplicate: true` and cached/pending response, with no second agent execution.
|
|
|
|
- [ ] **Step 4: Manually verify status UI**
|
|
|
|
Open `/status` and confirm:
|
|
|
|
- Channel row shows `webhook-dev`, `webhook/webhook`, account `local`, state `running`.
|
|
- Details dialog shows webhook URL and recent events.
|
|
- Restart button opens confirmation dialog.
|
|
- Confirming restart calls `/api/runtime/restart` and the app comes back after container restart.
|
|
|
|
## Acceptance Criteria
|
|
|
|
- All channel inbound messages, including generic sync webhook, use the strict bus-first path:
|
|
|
|
```text
|
|
adapter -> ChannelRuntime.accept_inbound() -> MessageBus.inbound -> bridge -> AgentService.handle_inbound_message() -> MessageBus.outbound -> ChannelManager.dispatch_outbound() -> adapter.send()
|
|
```
|
|
|
|
- No adapter directly calls `AgentService`.
|
|
- No adapter directly publishes to `MessageBus`.
|
|
- New session ids always use `<channel_id>:<account_id>:<peer_id>[:<thread_id>]`.
|
|
- Missing `ChannelIdentity`, missing `account_id`, or missing `peer_id` rejects inbound and records `inbound_rejected`.
|
|
- Runtime overwrites inconsistent inbound `session_id` and records `session_id_normalized`.
|
|
- Generic webhook accepts only fixed-schema text payloads.
|
|
- Generic webhook default response timeout is 1800 seconds.
|
|
- Generic webhook timeout returns HTTP 202 and does not cancel the backend run.
|
|
- Late generic webhook outbound is recorded as `outbound_unclaimed`.
|
|
- Dedupe cache returns pending/done/error for repeated `message_id` without rerunning the agent.
|
|
- Event log is persisted under workspace and does not store raw payloads or full conversation text.
|
|
- `/api/status` returns runtime channel status and runtime controls.
|
|
- `/status` displays compact channel status, details events on demand, and has a confirmed restart button.
|
|
- `POST /api/runtime/restart` is enabled by default and can be disabled with `BEAVER_ENABLE_SELF_RESTART=0`.
|
|
- AuthZ, real platform adapters, cron proactive delivery, streaming/progress, attachments, hot reload, and config UI are not implemented in v1.
|
|
|
|
## V2 Backlog
|
|
|
|
- Telegram adapter.
|
|
- Slack adapter.
|
|
- WhatsApp adapter.
|
|
- Platform signature validation and later AuthZ integration if needed.
|
|
- Channel config UI with secret masking.
|
|
- Hot reload or adapter restart API.
|
|
- Cron proactive channel delivery.
|
|
- Attachment handling.
|
|
- Streaming, typing, and progress messages where platform supports them.
|
|
- Retry/replay UI for failed outbound delivery.
|
|
- Rich event filtering/search.
|
|
|
|
## Execution Options
|
|
|
|
Plan complete and saved to `docs/superpowers/plans/2026-06-01-channel-runtime-v1.md`.
|
|
|
|
Two execution options:
|
|
|
|
1. **Subagent-Driven (recommended)** - Dispatch a fresh subagent per task, review between tasks, fast iteration.
|
|
2. **Inline Execution** - Execute tasks in this session using executing-plans, batch execution with checkpoints.
|
|
|
|
Which approach?
|