75 KiB
Channel Runtime V1 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Build Beaver Channel Runtime v1: a local-configured, bus-first channel runtime with a generic text webhook reference adapter, stable channel identity/session mapping, dedupe, delivery/event logs, /status visibility, and a restart-instance control.
Architecture: Channels are runtime integrations hosted inside the Web backend process. Every inbound message must pass through MessageBus.inbound, the gateway bridge, AgentService.handle_inbound_message(), MessageBus.outbound, ChannelManager.dispatch_outbound(), and adapter send(). Adapter code never calls AgentService directly and never publishes directly to the bus.
Tech Stack: FastAPI/Python backend, dataclasses, local JSON/JSONL state under workspace, Next.js 13 app router, React, TypeScript, Vitest, pytest.
Locked Decisions
- Channel v1 does not use AuthZ, OAuth, token introspection, permission scopes, pairing, allowlists, or platform signature validation.
- Channel config lives in local Beaver config under
BeaverConfig.channels. - All channel inbound messages run as chat/direct turns. Channel v1 does not create Tasks and does not support task prefixes.
- Session ids are generated only by the runtime from
ChannelIdentity:
<channel_id>:<account_id>:<peer_id>[:<thread_id>]
channel_id,kind, andaccount_idare separate concepts:channel_id: Beaver adapter instance id, for examplewebhook-dev.kind: adapter/platform kind, for examplewebhook.account_id: stable platform account/workspace/bot/phone identity.
ChannelIdentity.channel_idis the canonical field. ExistingInboundMessage.channelandOutboundMessage.channelremain for compatibility, but their semantic value ischannel_id.- Adapters do not hold
MessageBus. They submit inbound messages throughChannelInboundSink.accept_inbound(). - All enabled channels start automatically with the Web backend. Disabled channels are listed as disabled and are not instantiated.
- Config changes require backend restart in v1. No hot reload.
/statusgets a compact Channels section with details on demand. No independent channel configuration page in v1.- Channel config UI is not included. Config is hand-edited in local config.
- V1 implements only a generic fixed-schema text webhook reference adapter. No Telegram, Slack, WhatsApp, email, or Outlook channel adapter in v1.
- Generic webhook waits synchronously for the final reply through the bus-first path. It defaults to a 30 minute response timeout. Timeout returns
202 acceptedand does not cancel the backend run. - Generic webhook late outbound after request timeout is recorded as
outbound_unclaimed. - Dedupe result cache is persisted locally, defaults to 48 hours retention, and stores cached replies up to 20000 characters.
- Delivery failures are recorded but not retried automatically.
- Cron proactive channel delivery is not included in v1. Existing cron
deliver/channel/tofields remain reserved. - Streaming, typing indicators, tool progress, intermediate messages, attachment handling, multimedia, hot reload, config UI, and real platform adapters are v2 or later.
/statusrestores a restart-instance button with confirmation. Restart uses local backend self-restart throughPOST /api/runtime/restart; it is enabled by default and can be disabled withBEAVER_ENABLE_SELF_RESTART=0.
File Structure
- Modify:
app-instance/backend/beaver/foundation/config/schema.py- Add
ChannelConfigandchannels: dict[str, ChannelConfig].
- Add
- Modify:
app-instance/backend/beaver/foundation/config/loader.py- Parse
channelsfrom local config.
- Parse
- Modify:
app-instance/backend/tests/unit/test_config_loader.py- Cover channel config parsing.
- Modify:
app-instance/backend/beaver/foundation/events/message_bus.py- Add
ChannelIdentity, add optionalchannel_identityto inbound/outbound messages, and add content type fields.
- Add
- Modify:
app-instance/backend/beaver/engine/context/builder.py- Extend
SessionContextwith normalized channel identity fields and render them into model-visible context.
- Extend
- Modify:
app-instance/backend/beaver/engine/loop.py- Pass channel identity fields into
SessionContext.
- Pass channel identity fields into
- Modify:
app-instance/backend/beaver/services/agent_service.py- Preserve
channel_identityon outbound, pass normalized channel context to direct runs, and keep all channel runs in chat/direct mode.
- Preserve
- Modify:
app-instance/backend/beaver/interfaces/channels/base.py- Replace bus-owning adapter protocol with channel id, kind, mode, lifecycle, and outbound send.
- Modify:
app-instance/backend/beaver/interfaces/channels/manager.py- Dispatch by
channel.channel_id; adapters no longer expose bus.
- Dispatch by
- Modify:
app-instance/backend/beaver/interfaces/channels/memory.py- Update test adapter to new identity/session model without old two-segment session ids.
- Create:
app-instance/backend/beaver/interfaces/channels/state.py- Persistent dedupe store and channel event log.
- Create:
app-instance/backend/beaver/interfaces/channels/generic_webhook.py- Fixed-schema generic text webhook reference adapter.
- Create:
app-instance/backend/beaver/interfaces/channels/runtime.pyChannelRuntime, adapter factory, inbound bridge, outbound dispatch lifecycle, status and events API helpers.
- Modify:
app-instance/backend/beaver/interfaces/gateway/main.py- Turn
run_gateway()into a compatibility wrapper aroundChannelRuntime.
- Turn
- Modify:
app-instance/backend/beaver/interfaces/web/app.py- Start/stop
ChannelRuntime, add channel routes, expose channel status/events, add runtime restart API, include runtime controls in/api/status.
- Start/stop
- Modify:
app-instance/nginx.conf- Add long proxy timeout for channel webhook API path.
- Modify:
app-instance/create-instance.sh- Set
BEAVER_ENABLE_SELF_RESTART=1in created containers.
- Set
- Modify:
app-instance/frontend/types/index.ts- Expand channel and runtime control types.
- Modify:
app-instance/frontend/lib/api.ts- Add channel event and restart API helpers.
- Modify:
app-instance/frontend/app/(app)/status/page.tsx- Replace hard-coded channel chips with compact runtime channel table, details dialog, and restart confirmation button.
- Modify:
app-instance/backend/tests/unit/test_gateway_channels.py- Update existing tests to new bus-first runtime semantics and add generic webhook roundtrip coverage.
- Create:
app-instance/backend/tests/unit/test_channel_runtime.py- Unit tests for identity/session mapping, dedupe, event log, adapter factory, and restart guard behavior.
- Modify/Create frontend tests as needed:
app-instance/frontend/lib/apiand status page behavior should be covered through existing Vitest setup or a focused component test if test harness supports it.
Data Contracts
Local Config Shape
{
"channels": {
"webhook-dev": {
"enabled": true,
"kind": "webhook",
"mode": "webhook",
"accountId": "local",
"displayName": "Webhook Dev",
"config": {
"responseTimeoutSeconds": 1800,
"dedupeRetentionHours": 48,
"maxCachedReplyChars": 20000,
"maxCachedErrorChars": 4000
},
"secrets": {}
}
}
}
Generic Webhook Request
{
"peer_id": "demo-user",
"thread_id": "main",
"message_id": "msg-001",
"text": "hello",
"peer_type": "dm",
"user_id": "optional-user"
}
channel_id, kind, and account_id are read from config. The request body is not trusted for those fields.
Generic Webhook Response
Success:
{
"ok": true,
"duplicate": false,
"pending": false,
"session_id": "webhook-dev:local:demo-user:main",
"run_id": "run-1",
"reply": "assistant response"
}
Processing duplicate:
{
"ok": true,
"duplicate": true,
"pending": true,
"session_id": "webhook-dev:local:demo-user:main"
}
Completed duplicate:
{
"ok": true,
"duplicate": true,
"pending": false,
"session_id": "webhook-dev:local:demo-user:main",
"run_id": "run-1",
"reply": "assistant response"
}
Timed out waiting for reply:
{
"ok": true,
"duplicate": false,
"pending": true,
"session_id": "webhook-dev:local:demo-user:main"
}
Channel Status
{
"channel_id": "webhook-dev",
"kind": "webhook",
"mode": "webhook",
"display_name": "Webhook Dev",
"enabled": true,
"state": "running",
"account_id": "local",
"last_error": null,
"last_event_at": "2026-06-01T10:00:00Z",
"started_at": "2026-06-01T09:59:00Z",
"capabilities": ["receive_text", "send_text", "sync_webhook_response"],
"webhook_url": "/api/channels/webhook-dev/webhook"
}
Allowed states:
configured
disabled
starting
running
degraded
error
stopped
Channel Events
Minimum event kinds:
adapter_started
adapter_stopped
webhook_received
inbound_accepted
inbound_duplicate
inbound_rejected
session_id_normalized
direct_run_started
direct_run_finished
direct_run_failed
outbound_delivered
outbound_delivery_failed
outbound_unclaimed
webhook_response_timeout
adapter_error
Event log entries do not store full user message content or raw webhook payloads.
{
"event_id": "evt-1",
"channel_id": "webhook-dev",
"kind": "inbound_accepted",
"session_id": "webhook-dev:local:demo-user:main",
"message_id": "msg-001",
"run_id": null,
"status": "ok",
"error": null,
"text_preview": "hello",
"text_length": 5,
"created_at": "2026-06-01T10:00:00Z"
}
Task 1: Channel Config Schema
Files:
-
Modify:
app-instance/backend/beaver/foundation/config/schema.py -
Modify:
app-instance/backend/beaver/foundation/config/loader.py -
Modify:
app-instance/backend/tests/unit/test_config_loader.py -
Step 1: Write failing config loader test
Add this test to app-instance/backend/tests/unit/test_config_loader.py:
def test_config_loader_reads_channels(tmp_path) -> None:
config_path = tmp_path / "config.json"
config_path.write_text(
json.dumps(
{
"agents": {"defaults": {"model": "openai/gpt-5"}},
"channels": {
"webhook-dev": {
"enabled": True,
"kind": "webhook",
"mode": "webhook",
"accountId": "local",
"displayName": "Webhook Dev",
"config": {
"responseTimeoutSeconds": 1800,
"dedupeRetentionHours": 48,
},
"secrets": {"ignored_for_status": "secret-value"},
}
},
}
),
encoding="utf-8",
)
config = load_config(config_path=config_path)
channel = config.channels["webhook-dev"]
assert channel.enabled is True
assert channel.kind == "webhook"
assert channel.mode == "webhook"
assert channel.account_id == "local"
assert channel.display_name == "Webhook Dev"
assert channel.config["response_timeout_seconds"] == 1800
assert channel.config["dedupe_retention_hours"] == 48
assert channel.secrets == {"ignored_for_status": "secret-value"}
- Step 2: Run failing test
Run:
cd app-instance/backend
uv run pytest tests/unit/test_config_loader.py::test_config_loader_reads_channels -q
Expected: fail because BeaverConfig has no channels field.
- Step 3: Add
ChannelConfigto config schema
In app-instance/backend/beaver/foundation/config/schema.py, add:
@dataclass(slots=True)
class ChannelConfig:
"""One configured channel adapter instance."""
enabled: bool = False
kind: str = ""
mode: str = "webhook"
account_id: str = ""
display_name: str = ""
config: dict[str, Any] = field(default_factory=dict)
secrets: dict[str, str] = field(default_factory=dict)
Then add this field to BeaverConfig:
channels: dict[str, ChannelConfig] = field(default_factory=dict)
- Step 4: Parse channel config
In app-instance/backend/beaver/foundation/config/loader.py, add helper functions near the other config parsing helpers:
def _camel_to_snake_key(value: str) -> str:
result: list[str] = []
for char in value:
if char.isupper() and result:
result.append("_")
result.append(char.lower())
return "".join(result)
def _normalize_config_map(value: Any) -> dict[str, Any]:
if not isinstance(value, dict):
return {}
return {
_camel_to_snake_key(str(key)): item
for key, item in value.items()
if str(key).strip()
}
def _parse_channel_config(payload: Any) -> ChannelConfig:
if not isinstance(payload, dict):
return ChannelConfig()
secrets = payload.get("secrets", {})
if not isinstance(secrets, dict):
secrets = {}
return ChannelConfig(
enabled=bool(payload.get("enabled", False)),
kind=_string(payload.get("kind")),
mode=_string(payload.get("mode")) or "webhook",
account_id=_string(payload.get("accountId") or payload.get("account_id")),
display_name=_string(payload.get("displayName") or payload.get("display_name")),
config=_normalize_config_map(payload.get("config")),
secrets={str(key): str(value) for key, value in secrets.items() if str(key).strip()},
)
Import ChannelConfig from schema, then wire it into load_config():
channels_payload = data.get("channels", {})
channels = {
str(channel_id): _parse_channel_config(payload)
for channel_id, payload in channels_payload.items()
if isinstance(channels_payload, dict) and str(channel_id).strip()
}
Pass channels=channels into BeaverConfig(...).
- Step 5: Verify config test passes
Run:
cd app-instance/backend
uv run pytest tests/unit/test_config_loader.py::test_config_loader_reads_channels -q
Expected: pass.
Task 2: Channel Identity and Message Models
Files:
-
Modify:
app-instance/backend/beaver/foundation/events/message_bus.py -
Modify:
app-instance/backend/beaver/engine/context/builder.py -
Modify:
app-instance/backend/beaver/engine/loop.py -
Modify:
app-instance/backend/beaver/services/agent_service.py -
Modify:
app-instance/backend/tests/unit/test_imports.py -
Create or update:
app-instance/backend/tests/unit/test_channel_runtime.py -
Step 1: Write failing identity tests
Create app-instance/backend/tests/unit/test_channel_runtime.py with:
from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage
def test_channel_identity_builds_stable_session_id() -> None:
identity = ChannelIdentity(
channel_id="webhook-dev",
kind="webhook",
account_id="local",
peer_id="demo-user",
thread_id="main",
peer_type="dm",
message_id="msg-1",
)
assert identity.session_id() == "webhook-dev:local:demo-user:main"
assert identity.dedupe_key() == "webhook-dev:local:demo-user:main:msg-1"
def test_channel_identity_requires_routing_fields() -> None:
identity = ChannelIdentity(channel_id="webhook-dev", kind="webhook", account_id="", peer_id="demo")
assert identity.validation_error() == "account_id is required"
def test_messages_carry_channel_identity() -> None:
identity = ChannelIdentity(
channel_id="webhook-dev",
kind="webhook",
account_id="local",
peer_id="demo-user",
message_id="msg-1",
)
inbound = InboundMessage(channel="webhook-dev", content="hello", channel_identity=identity)
outbound = OutboundMessage(
channel="webhook-dev",
content="ok",
session_id=identity.session_id(),
finish_reason="stop",
channel_identity=identity,
)
assert inbound.channel_identity is identity
assert outbound.channel_identity is identity
- Step 2: Run failing tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_channel_identity_builds_stable_session_id tests/unit/test_channel_runtime.py::test_channel_identity_requires_routing_fields tests/unit/test_channel_runtime.py::test_messages_carry_channel_identity -q
Expected: fail because ChannelIdentity does not exist.
- Step 3: Implement identity model
In app-instance/backend/beaver/foundation/events/message_bus.py, add:
@dataclass(slots=True)
class ChannelIdentity:
"""Normalized channel routing identity.
channel_id is the Beaver adapter instance id, not the platform kind.
"""
channel_id: str
kind: str
account_id: str
peer_id: str
thread_id: str | None = None
peer_type: str = "unknown"
user_id: str | None = None
message_id: str | None = None
def validation_error(self) -> str | None:
if not self.channel_id.strip():
return "channel_id is required"
if not self.account_id.strip():
return "account_id is required"
if not self.peer_id.strip():
return "peer_id is required"
return None
def session_id(self) -> str:
parts = [self.channel_id, self.account_id, self.peer_id]
if self.thread_id:
parts.append(self.thread_id)
return ":".join(_clean_session_part(part) for part in parts)
def dedupe_key(self) -> str | None:
if not self.message_id:
return None
return f"{self.session_id()}:{_clean_session_part(self.message_id)}"
def _clean_session_part(value: str) -> str:
cleaned = str(value).strip()
if not cleaned:
return "unknown"
return cleaned.replace(":", "_")
Add fields to InboundMessage:
content_type: str = "text"
channel_identity: ChannelIdentity | None = None
Add fields to OutboundMessage:
content_type: str = "text"
channel_identity: ChannelIdentity | None = None
Update app-instance/backend/beaver/foundation/events/__init__.py to export ChannelIdentity.
- Step 4: Preserve identity in AgentService outbound
In AgentService.build_outbound_message(), add:
channel_identity=inbound.channel_identity,
content_type=inbound.content_type,
In AgentService.build_outbound_error(), add the same two fields.
- Step 5: Pass channel context into direct runs
In AgentService.handle_inbound_message(), pass normalized channel fields:
channel_identity = inbound.channel_identity
result = await self.submit_direct(
inbound.content,
session_id=inbound.session_id,
source=f"gateway:{inbound.channel}",
user_id=inbound.user_id or (channel_identity.user_id if channel_identity else None),
title=inbound.title,
execution_context=inbound.execution_context,
model=inbound.model,
provider_name=inbound.provider_name,
embedding_model=inbound.embedding_model,
channel_identity=channel_identity,
)
In AgentLoop._process_direct_impl(), add a keyword parameter:
channel_identity: ChannelIdentity | None = None,
Import ChannelIdentity. When constructing SessionContext, include:
channel=channel_identity.channel_id if channel_identity else None,
channel_kind=channel_identity.kind if channel_identity else None,
account_id=channel_identity.account_id if channel_identity else None,
peer_id=channel_identity.peer_id if channel_identity else None,
peer_type=channel_identity.peer_type if channel_identity else None,
chat_id=channel_identity.peer_id if channel_identity else None,
thread_id=channel_identity.thread_id if channel_identity else None,
- Step 6: Extend
SessionContextrendering
In app-instance/backend/beaver/engine/context/builder.py, add fields:
channel_kind: str | None = None
account_id: str | None = None
peer_id: str | None = None
peer_type: str | None = None
thread_id: str | None = None
In _render_session_section(), add:
if session_context.channel_kind:
rows.append(f"Channel Kind: {session_context.channel_kind}")
if session_context.account_id:
rows.append(f"Account ID: {session_context.account_id}")
if session_context.peer_id:
rows.append(f"Peer ID: {session_context.peer_id}")
if session_context.peer_type:
rows.append(f"Peer Type: {session_context.peer_type}")
if session_context.thread_id:
rows.append(f"Thread ID: {session_context.thread_id}")
Do not render raw_channel_payload.
- Step 7: Verify identity tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_imports.py -q
Expected: pass after updating import assertions if needed.
Task 3: Dedupe Store and Event Log
Files:
-
Create:
app-instance/backend/beaver/interfaces/channels/state.py -
Modify:
app-instance/backend/tests/unit/test_channel_runtime.py -
Step 1: Add failing state tests
Append to test_channel_runtime.py:
from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog
def test_dedupe_store_tracks_processing_and_done(tmp_path) -> None:
store = ChannelDedupeStore(tmp_path / "dedupe.json", retention_hours=48)
created = store.mark_processing(
dedupe_key="webhook-dev:local:demo:msg-1",
session_id="webhook-dev:local:demo",
message_id="msg-1",
)
duplicate = store.mark_processing(
dedupe_key="webhook-dev:local:demo:msg-1",
session_id="webhook-dev:local:demo",
message_id="msg-1",
)
assert created.created is True
assert duplicate.created is False
assert duplicate.record is not None
assert duplicate.record["status"] == "processing"
store.mark_done(
dedupe_key="webhook-dev:local:demo:msg-1",
run_id="run-1",
reply="hello" * 10000,
max_reply_chars=20,
)
done = store.get("webhook-dev:local:demo:msg-1")
assert done is not None
assert done["status"] == "done"
assert done["reply"] == "hellohellohellohello"
def test_channel_event_log_writes_recent_events(tmp_path) -> None:
log = ChannelEventLog(tmp_path / "events.jsonl")
log.record(
channel_id="webhook-dev",
kind="inbound_accepted",
session_id="webhook-dev:local:demo",
message_id="msg-1",
status="ok",
text="hello world",
)
events = log.recent(channel_id="webhook-dev", limit=10)
assert len(events) == 1
assert events[0]["kind"] == "inbound_accepted"
assert events[0]["text_preview"] == "hello world"
assert "raw_channel_payload" not in events[0]
- Step 2: Run failing state tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_dedupe_store_tracks_processing_and_done tests/unit/test_channel_runtime.py::test_channel_event_log_writes_recent_events -q
Expected: fail because state.py does not exist.
- Step 3: Implement
state.py
Create app-instance/backend/beaver/interfaces/channels/state.py with:
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from pathlib import Path
from threading import Lock
from typing import Any
from uuid import uuid4
def _now_ms() -> int:
return int(time.time() * 1000)
def _iso_now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
@dataclass(slots=True)
class DedupeWriteResult:
created: bool
record: dict[str, Any] | None = None
class ChannelDedupeStore:
def __init__(self, path: Path, *, retention_hours: int = 48) -> None:
self.path = path
self.retention_ms = max(1, int(retention_hours)) * 60 * 60 * 1000
self._lock = Lock()
def _load(self) -> dict[str, Any]:
if not self.path.exists():
return {"records": {}}
try:
data = json.loads(self.path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {"records": {}}
return data if isinstance(data, dict) and isinstance(data.get("records"), dict) else {"records": {}}
def _save(self, data: dict[str, Any]) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = self.path.with_name(f"{self.path.name}.tmp")
tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
tmp_path.replace(self.path)
def _prune_unlocked(self, data: dict[str, Any], now_ms: int) -> None:
records = data.get("records", {})
expired_before = now_ms - self.retention_ms
for key, record in list(records.items()):
updated_at_ms = int(record.get("updated_at_ms") or record.get("created_at_ms") or 0)
if updated_at_ms < expired_before:
records.pop(key, None)
def get(self, dedupe_key: str) -> dict[str, Any] | None:
with self._lock:
data = self._load()
self._prune_unlocked(data, _now_ms())
return data["records"].get(dedupe_key)
def mark_processing(self, *, dedupe_key: str, session_id: str, message_id: str) -> DedupeWriteResult:
with self._lock:
data = self._load()
now_ms = _now_ms()
self._prune_unlocked(data, now_ms)
existing = data["records"].get(dedupe_key)
if existing is not None:
self._save(data)
return DedupeWriteResult(created=False, record=existing)
record = {
"dedupe_key": dedupe_key,
"status": "processing",
"session_id": session_id,
"message_id": message_id,
"run_id": None,
"reply": None,
"error": None,
"created_at_ms": now_ms,
"updated_at_ms": now_ms,
}
data["records"][dedupe_key] = record
self._save(data)
return DedupeWriteResult(created=True, record=record)
def mark_done(self, *, dedupe_key: str, run_id: str | None, reply: str, max_reply_chars: int) -> None:
self._mark_result(
dedupe_key=dedupe_key,
status="done",
run_id=run_id,
reply=reply[: max(0, int(max_reply_chars))],
error=None,
)
def mark_error(self, *, dedupe_key: str, error: str, max_error_chars: int) -> None:
self._mark_result(
dedupe_key=dedupe_key,
status="error",
run_id=None,
reply=None,
error=error[: max(0, int(max_error_chars))],
)
def _mark_result(
self,
*,
dedupe_key: str,
status: str,
run_id: str | None,
reply: str | None,
error: str | None,
) -> None:
with self._lock:
data = self._load()
record = data["records"].get(dedupe_key)
if record is None:
record = {"dedupe_key": dedupe_key, "created_at_ms": _now_ms()}
data["records"][dedupe_key] = record
record.update(
{
"status": status,
"run_id": run_id,
"reply": reply,
"error": error,
"updated_at_ms": _now_ms(),
}
)
self._save(data)
class ChannelEventLog:
def __init__(self, path: Path) -> None:
self.path = path
self._lock = Lock()
def record(
self,
*,
channel_id: str,
kind: str,
session_id: str | None = None,
message_id: str | None = None,
run_id: str | None = None,
status: str = "ok",
error: str | None = None,
text: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
entry = {
"event_id": uuid4().hex,
"channel_id": channel_id,
"kind": kind,
"session_id": session_id,
"message_id": message_id,
"run_id": run_id,
"status": status,
"error": error,
"text_preview": (text or "")[:120] if text else None,
"text_length": len(text) if text else 0,
"metadata": metadata or {},
"created_at": _iso_now(),
}
with self._lock:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
return entry
def recent(self, *, channel_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]:
if not self.path.exists():
return []
lines = self.path.read_text(encoding="utf-8").splitlines()
items: list[dict[str, Any]] = []
for line in reversed(lines):
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
if channel_id and item.get("channel_id") != channel_id:
continue
items.append(item)
if len(items) >= max(1, int(limit)):
break
return list(reversed(items))
- Step 4: Verify state tests pass
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_dedupe_store_tracks_processing_and_done tests/unit/test_channel_runtime.py::test_channel_event_log_writes_recent_events -q
Expected: pass.
Task 4: Adapter Protocol and ChannelManager Migration
Files:
-
Modify:
app-instance/backend/beaver/interfaces/channels/base.py -
Modify:
app-instance/backend/beaver/interfaces/channels/manager.py -
Modify:
app-instance/backend/beaver/interfaces/channels/memory.py -
Modify:
app-instance/backend/tests/unit/test_gateway_channels.py -
Modify:
app-instance/backend/tests/unit/test_imports.py -
Step 1: Write failing manager tests for new protocol
Update the manager-focused parts of test_gateway_channels.py to use channel_id instead of name and no adapter bus:
def test_channel_manager_dispatches_by_channel_id() -> None:
class CaptureChannel:
channel_id = "webhook-dev"
kind = "webhook"
mode = "webhook"
def __init__(self) -> None:
self.sent = []
async def start(self) -> None:
pass
async def stop(self) -> None:
pass
async def send(self, message: Any) -> None:
self.sent.append(message)
async def run() -> None:
bus = MessageBus()
channel = CaptureChannel()
manager = ChannelManager(bus)
manager.register(channel)
await bus.publish_outbound(
OutboundMessage(
channel="webhook-dev",
content="ok",
session_id="webhook-dev:local:demo",
finish_reason="stop",
)
)
stop_event = asyncio.Event()
stop_event.set()
await manager.dispatch_outbound(stop_event)
assert channel.sent[0].content == "ok"
asyncio.run(run())
- Step 2: Run failing manager test
Run:
cd app-instance/backend
uv run pytest tests/unit/test_gateway_channels.py::test_channel_manager_dispatches_by_channel_id -q
Expected: fail because current protocol uses name and bus checks.
- Step 3: Update adapter protocol
Replace ChannelAdapter in base.py with:
class ChannelAdapter(Protocol):
"""Minimal contract every runtime channel adapter must implement."""
channel_id: str
kind: str
mode: str
async def start(self) -> None:
"""Prepare the channel before messages are routed."""
async def stop(self) -> None:
"""Stop accepting/routing channel messages."""
async def send(self, message: OutboundMessage) -> None:
"""Deliver an outbound message to the concrete channel."""
class ChannelInboundSink(Protocol):
async def accept_inbound(self, message: InboundMessage) -> Any:
"""Accept a normalized inbound message from an adapter."""
Import InboundMessage.
- Step 4: Update ChannelManager
In manager.py:
def register(self, channel: ChannelAdapter) -> None:
if self.started:
raise RuntimeError("Cannot register channels after ChannelManager.start()")
if channel.channel_id in self.channels:
raise ValueError(f"Channel already registered: {channel.channel_id}")
self.channels[channel.channel_id] = channel
The rest of dispatch remains keyed by message.channel.
- Step 5: Migrate memory adapter to new protocol
Change MemoryChannelAdapter.__init__ signature:
def __init__(
self,
inbound_sink: ChannelInboundSink,
*,
channel_id: str = "memory-dev",
kind: str = "memory",
mode: str = "webhook",
account_id: str = "memory",
) -> None:
self.channel_id = channel_id
self.kind = kind
self.mode = mode
self.account_id = account_id
self.inbound_sink = inbound_sink
self.started = False
self.sent_messages: list[OutboundMessage] = []
Update publish_external_text() to build ChannelIdentity and call self.inbound_sink.accept_inbound(message).
- Step 6: Verify existing gateway tests after migration
Run:
cd app-instance/backend
uv run pytest tests/unit/test_gateway_channels.py tests/unit/test_imports.py -q
Expected: some old tests will need assertion updates to new session id format. Update all expected session ids to <channel_id>:<account_id>:<peer_id>[:<thread_id>] and remove old telegram:chat-1 expectations.
Task 5: ChannelRuntime Core
Files:
-
Create:
app-instance/backend/beaver/interfaces/channels/runtime.py -
Modify:
app-instance/backend/beaver/interfaces/channels/__init__.py -
Modify:
app-instance/backend/tests/unit/test_channel_runtime.py -
Step 1: Add failing runtime tests
Append:
from beaver.foundation.config.schema import ChannelConfig
from beaver.foundation.events import MessageBus
from beaver.interfaces.channels.runtime import ChannelRuntime
class FakeAgentService:
is_running = True
async def handle_inbound_message(self, inbound):
return OutboundMessage(
message_id=inbound.message_id,
channel=inbound.channel,
content=f"echo:{inbound.content}",
session_id=inbound.session_id,
finish_reason="stop",
channel_identity=inbound.channel_identity,
)
def test_channel_runtime_accept_inbound_normalizes_session_and_dedupes(tmp_path) -> None:
async def run() -> None:
bus = MessageBus()
runtime = ChannelRuntime(
service=FakeAgentService(),
bus=bus,
workspace=tmp_path,
channels={},
)
identity = ChannelIdentity(
channel_id="webhook-dev",
kind="webhook",
account_id="local",
peer_id="demo",
message_id="msg-1",
)
result = await runtime.accept_inbound(
InboundMessage(
channel="webhook-dev",
content="hello",
session_id="wrong",
channel_identity=identity,
)
)
duplicate = await runtime.accept_inbound(
InboundMessage(
channel="webhook-dev",
content="hello",
channel_identity=identity,
)
)
queued = await bus.consume_inbound()
assert result.accepted is True
assert queued.session_id == "webhook-dev:local:demo"
assert duplicate.accepted is False
assert duplicate.duplicate is True
asyncio.run(run())
- Step 2: Run failing runtime test
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_channel_runtime_accept_inbound_normalizes_session_and_dedupes -q
Expected: fail because ChannelRuntime does not exist.
- Step 3: Implement runtime result models and constructor
Create runtime.py with:
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from beaver.foundation.config.schema import ChannelConfig
from beaver.foundation.events import InboundMessage, MessageBus
from beaver.interfaces.channels import ChannelAdapter, ChannelManager
from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog
from beaver.services.agent_service import AgentService
@dataclass(slots=True)
class ChannelAcceptResult:
accepted: bool
duplicate: bool = False
pending: bool = False
rejected: bool = False
session_id: str | None = None
dedupe_key: str | None = None
record: dict[str, Any] | None = None
error: str | None = None
class ChannelRuntime:
def __init__(
self,
*,
service: AgentService,
workspace: Path,
channels: dict[str, ChannelConfig],
bus: MessageBus | None = None,
) -> None:
self.service = service
self.workspace = workspace
self.bus = bus or MessageBus()
self.manager = ChannelManager(self.bus)
self.channel_configs = dict(channels)
self.adapters: dict[str, ChannelAdapter] = {}
self.states: dict[str, dict[str, Any]] = {}
state_dir = workspace / "state" / "channels"
self.dedupe = ChannelDedupeStore(state_dir / "dedupe.json")
self.events = ChannelEventLog(state_dir / "events.jsonl")
self._bridge_task: asyncio.Task[None] | None = None
self._dispatch_task: asyncio.Task[None] | None = None
self._stop_event = asyncio.Event()
self._dispatch_stop_event = asyncio.Event()
- Step 4: Implement
accept_inbound()
Add:
async def accept_inbound(self, message: InboundMessage) -> ChannelAcceptResult:
identity = message.channel_identity
if identity is None:
self.events.record(channel_id=message.channel, kind="inbound_rejected", status="error", error="channel_identity is required")
return ChannelAcceptResult(accepted=False, rejected=True, error="channel_identity is required")
validation_error = identity.validation_error()
if validation_error:
self.events.record(channel_id=identity.channel_id, kind="inbound_rejected", status="error", error=validation_error)
return ChannelAcceptResult(accepted=False, rejected=True, error=validation_error)
expected_session_id = identity.session_id()
if message.session_id != expected_session_id:
self.events.record(
channel_id=identity.channel_id,
kind="session_id_normalized",
session_id=expected_session_id,
message_id=identity.message_id,
status="ok",
)
message.session_id = expected_session_id
message.channel = identity.channel_id
dedupe_key = identity.dedupe_key()
if dedupe_key:
write = self.dedupe.mark_processing(
dedupe_key=dedupe_key,
session_id=expected_session_id,
message_id=identity.message_id or "",
)
if not write.created:
record = write.record or {}
self.events.record(
channel_id=identity.channel_id,
kind="inbound_duplicate",
session_id=expected_session_id,
message_id=identity.message_id,
status=str(record.get("status") or "processing"),
)
return ChannelAcceptResult(
accepted=False,
duplicate=True,
pending=record.get("status") == "processing",
session_id=expected_session_id,
dedupe_key=dedupe_key,
record=record,
)
self.events.record(
channel_id=identity.channel_id,
kind="inbound_accepted",
session_id=expected_session_id,
message_id=identity.message_id,
status="ok",
text=message.content,
)
await self.bus.publish_inbound(message)
return ChannelAcceptResult(
accepted=True,
session_id=expected_session_id,
dedupe_key=dedupe_key,
)
- Step 5: Implement bridge and dispatch lifecycle
Add:
async def start(self) -> None:
self._stop_event.clear()
self._dispatch_stop_event.clear()
self._bridge_task = asyncio.create_task(self._bridge_inbound_to_agent())
self._dispatch_task = asyncio.create_task(self.manager.dispatch_outbound(self._dispatch_stop_event))
async def stop(self) -> None:
self._stop_event.set()
self._dispatch_stop_event.set()
for task in (self._bridge_task, self._dispatch_task):
if task is None:
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await self.manager.stop()
async def _bridge_inbound_to_agent(self) -> None:
while not self._stop_event.is_set():
try:
inbound = await asyncio.wait_for(self.bus.consume_inbound(), timeout=0.25)
except asyncio.TimeoutError:
continue
identity = inbound.channel_identity
try:
self.events.record(
channel_id=inbound.channel,
kind="direct_run_started",
session_id=inbound.session_id,
message_id=identity.message_id if identity else inbound.message_id,
)
outbound = await self.service.handle_inbound_message(inbound)
except Exception as exc:
self.events.record(
channel_id=inbound.channel,
kind="direct_run_failed",
session_id=inbound.session_id,
message_id=identity.message_id if identity else inbound.message_id,
status="error",
error=str(exc),
)
outbound = AgentService.build_outbound_error(inbound, detail=str(exc), finish_reason="error")
else:
self.events.record(
channel_id=outbound.channel,
kind="direct_run_finished",
session_id=outbound.session_id,
message_id=identity.message_id if identity else inbound.message_id,
run_id=outbound.run_id,
)
await self.bus.publish_outbound(outbound)
- Step 6: Verify runtime tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py -q
Expected: pass for identity, state, and basic runtime tests.
Task 6: Generic Webhook Reference Adapter
Files:
-
Create:
app-instance/backend/beaver/interfaces/channels/generic_webhook.py -
Modify:
app-instance/backend/beaver/interfaces/channels/runtime.py -
Modify:
app-instance/backend/tests/unit/test_channel_runtime.py -
Step 1: Add failing generic webhook roundtrip test
Append:
from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter
def test_generic_webhook_adapter_waits_for_outbound_reply(tmp_path) -> None:
async def run() -> None:
bus = MessageBus()
runtime = ChannelRuntime(
service=FakeAgentService(),
bus=bus,
workspace=tmp_path,
channels={},
)
adapter = GenericWebhookAdapter(
channel_id="webhook-dev",
kind="webhook",
mode="webhook",
account_id="local",
display_name="Webhook Dev",
inbound_sink=runtime,
response_timeout_seconds=1,
)
runtime.manager.register(adapter)
await runtime.start()
try:
response = await adapter.handle_webhook_payload(
{
"peer_id": "demo",
"message_id": "msg-1",
"text": "hello",
"peer_type": "dm",
}
)
finally:
await runtime.stop()
assert response["ok"] is True
assert response["reply"] == "echo:hello"
assert response["session_id"] == "webhook-dev:local:demo"
asyncio.run(run())
- Step 2: Run failing adapter test
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_generic_webhook_adapter_waits_for_outbound_reply -q
Expected: fail because generic adapter does not exist.
- Step 3: Implement generic adapter
Create generic_webhook.py:
from __future__ import annotations
import asyncio
from typing import Any
from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage
from beaver.interfaces.channels.base import ChannelInboundSink
class GenericWebhookAdapter:
def __init__(
self,
*,
channel_id: str,
kind: str,
mode: str,
account_id: str,
display_name: str = "",
inbound_sink: ChannelInboundSink,
response_timeout_seconds: float = 1800,
) -> None:
self.channel_id = channel_id
self.kind = kind
self.mode = mode
self.account_id = account_id
self.display_name = display_name or channel_id
self.inbound_sink = inbound_sink
self.response_timeout_seconds = max(1.0, float(response_timeout_seconds))
self.started = False
self._pending: dict[str, asyncio.Future[OutboundMessage]] = {}
async def start(self) -> None:
self.started = True
async def stop(self) -> None:
self.started = False
for future in list(self._pending.values()):
if not future.done():
future.cancel()
self._pending.clear()
async def handle_webhook_payload(self, payload: dict[str, Any]) -> dict[str, Any]:
text = str(payload.get("text") or "").strip()
peer_id = str(payload.get("peer_id") or "").strip()
message_id = str(payload.get("message_id") or "").strip()
thread_id = str(payload.get("thread_id") or "").strip() or None
peer_type = str(payload.get("peer_type") or "unknown").strip() or "unknown"
user_id = str(payload.get("user_id") or "").strip() or None
if not text:
return {"ok": False, "error": "text is required"}
if not peer_id:
return {"ok": False, "error": "peer_id is required"}
if not message_id:
return {"ok": False, "error": "message_id is required"}
identity = ChannelIdentity(
channel_id=self.channel_id,
kind=self.kind,
account_id=self.account_id,
peer_id=peer_id,
thread_id=thread_id,
peer_type=peer_type,
user_id=user_id,
message_id=message_id,
)
inbound = InboundMessage(
channel=self.channel_id,
content=text,
user_id=user_id,
channel_identity=identity,
metadata={"webhook": {"peer_type": peer_type}},
)
future = asyncio.get_running_loop().create_future()
self._pending[inbound.message_id] = future
accept = await self.inbound_sink.accept_inbound(inbound)
if not accept.accepted:
self._pending.pop(inbound.message_id, None)
record = accept.record or {}
return {
"ok": accept.error is None,
"duplicate": accept.duplicate,
"pending": accept.pending,
"session_id": accept.session_id,
"status": record.get("status"),
"run_id": record.get("run_id"),
"reply": record.get("reply"),
"error": accept.error or record.get("error"),
}
try:
outbound = await asyncio.wait_for(future, timeout=self.response_timeout_seconds)
except asyncio.TimeoutError:
self._pending.pop(inbound.message_id, None)
return {
"ok": True,
"duplicate": False,
"pending": True,
"session_id": accept.session_id,
}
return {
"ok": outbound.finish_reason != "error",
"duplicate": False,
"pending": False,
"session_id": outbound.session_id,
"run_id": outbound.run_id,
"reply": outbound.content,
"error": outbound.metadata.get("error"),
}
async def send(self, message: OutboundMessage) -> None:
future = self._pending.pop(message.message_id, None)
if future is None or future.done():
return
future.set_result(message)
- Step 4: Mark dedupe done/error from runtime dispatch
In ChannelRuntime._bridge_inbound_to_agent(), after outbound is created, if inbound has dedupe_key, mark done/error:
dedupe_key = identity.dedupe_key() if identity else None
if dedupe_key:
if outbound.finish_reason == "error":
self.dedupe.mark_error(
dedupe_key=dedupe_key,
error=outbound.content,
max_error_chars=4000,
)
else:
self.dedupe.mark_done(
dedupe_key=dedupe_key,
run_id=outbound.run_id,
reply=outbound.content,
max_reply_chars=20000,
)
- Step 5: Record delivery events in ChannelManager or runtime wrapper
Wrap adapter send() in ChannelManager.dispatch_outbound() or introduce a runtime-managed dispatch callback. V1 minimal implementation can keep the manager catch and append undeliverable, but ChannelRuntime must expose delivery events. If manager remains event-unaware, add optional callback:
async def dispatch_outbound(
self,
stop_event: asyncio.Event,
on_delivered: Callable[[OutboundMessage], Awaitable[None]] | None = None,
on_failed: Callable[[OutboundMessage, Exception | None], Awaitable[None]] | None = None,
) -> None:
Use callbacks in ChannelRuntime.start() to record outbound_delivered and outbound_delivery_failed.
- Step 6: Verify generic webhook roundtrip
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_generic_webhook_adapter_waits_for_outbound_reply -q
Expected: pass.
Task 7: Runtime Adapter Factory and Status
Files:
-
Modify:
app-instance/backend/beaver/interfaces/channels/runtime.py -
Modify:
app-instance/backend/tests/unit/test_channel_runtime.py -
Step 1: Add failing status/factory test
Append:
def test_channel_runtime_starts_enabled_generic_webhook_and_reports_status(tmp_path) -> None:
async def run() -> None:
runtime = ChannelRuntime(
service=FakeAgentService(),
workspace=tmp_path,
channels={
"webhook-dev": ChannelConfig(
enabled=True,
kind="webhook",
mode="webhook",
account_id="local",
display_name="Webhook Dev",
config={"response_timeout_seconds": 1800},
),
"off": ChannelConfig(
enabled=False,
kind="webhook",
mode="webhook",
account_id="local",
),
},
)
await runtime.start()
try:
statuses = runtime.statuses()
finally:
await runtime.stop()
by_id = {item["channel_id"]: item for item in statuses}
assert by_id["webhook-dev"]["state"] == "running"
assert by_id["webhook-dev"]["webhook_url"] == "/api/channels/webhook-dev/webhook"
assert by_id["off"]["state"] == "disabled"
asyncio.run(run())
- Step 2: Implement adapter creation
In ChannelRuntime.start(), before starting bridge/dispatch tasks:
for channel_id, cfg in self.channel_configs.items():
if not cfg.enabled:
self.states[channel_id] = {"state": "disabled", "last_error": None}
continue
try:
adapter = self._build_adapter(channel_id, cfg)
self.adapters[channel_id] = adapter
self.manager.register(adapter)
await adapter.start()
self.states[channel_id] = {"state": "running", "last_error": None, "started_at": _iso_now()}
self.events.record(channel_id=channel_id, kind="adapter_started")
except Exception as exc:
self.states[channel_id] = {"state": "error", "last_error": str(exc)}
self.events.record(channel_id=channel_id, kind="adapter_error", status="error", error=str(exc))
Add _build_adapter():
def _build_adapter(self, channel_id: str, cfg: ChannelConfig) -> ChannelAdapter:
if cfg.kind != "webhook" or cfg.mode != "webhook":
raise ValueError(f"Unsupported channel kind/mode: {cfg.kind}/{cfg.mode}")
from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter
return GenericWebhookAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
response_timeout_seconds=float(cfg.config.get("response_timeout_seconds") or 1800),
)
- Step 3: Implement status helpers
Add:
def statuses(self) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
recent = self.events.recent(limit=500)
last_by_channel = {event["channel_id"]: event for event in recent if event.get("channel_id")}
for channel_id, cfg in self.channel_configs.items():
state = self.states.get(channel_id, {"state": "configured", "last_error": None})
items.append(
{
"channel_id": channel_id,
"name": channel_id,
"kind": cfg.kind,
"mode": cfg.mode,
"display_name": cfg.display_name or channel_id,
"enabled": cfg.enabled,
"state": state.get("state", "configured"),
"account_id": cfg.account_id,
"last_error": state.get("last_error"),
"started_at": state.get("started_at"),
"last_event_at": last_by_channel.get(channel_id, {}).get("created_at"),
"capabilities": ["receive_text", "send_text", "sync_webhook_response"]
if cfg.kind == "webhook"
else [],
"webhook_url": f"/api/channels/{channel_id}/webhook" if cfg.kind == "webhook" else None,
}
)
return items
def recent_events(self, channel_id: str, *, limit: int = 100) -> list[dict[str, Any]]:
return self.events.recent(channel_id=channel_id, limit=limit)
- Step 4: Verify status/factory tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_channel_runtime_starts_enabled_generic_webhook_and_reports_status -q
Expected: pass.
Task 8: Web Backend Integration and Routes
Files:
-
Modify:
app-instance/backend/beaver/interfaces/web/app.py -
Modify:
app-instance/backend/beaver/interfaces/gateway/main.py -
Modify:
app-instance/backend/tests/unit/test_gateway_channels.py -
Step 1: Add failing Web API route test
Add to test_gateway_channels.py or a focused web app test file:
def test_web_app_exposes_channel_status_and_webhook(tmp_path) -> None:
from fastapi.testclient import TestClient
from beaver.foundation.config.loader import load_config
from beaver.interfaces.web.app import create_app
config_path = tmp_path / "config.json"
workspace = tmp_path / "workspace"
workspace.mkdir()
config_path.write_text(
json.dumps(
{
"agents": {"defaults": {"model": "openai/gpt-5"}},
"providers": {},
"channels": {
"webhook-dev": {
"enabled": True,
"kind": "webhook",
"mode": "webhook",
"accountId": "local",
}
},
}
),
encoding="utf-8",
)
app = create_app(workspace=workspace, config_path=config_path)
with TestClient(app) as client:
status = client.get("/api/status").json()
assert status["channels"][0]["channel_id"] == "webhook-dev"
assert status["channels"][0]["state"] == "running"
Use fake provider/service injection if the default app startup requires external provider calls. The test should not call a real LLM.
- Step 2: Start ChannelRuntime in lifespan
In _app_lifespan, after AgentService.start() and config boot, create and start runtime:
loaded = attached_service.create_loop().boot()
channel_runtime = ChannelRuntime(
service=attached_service,
workspace=loaded.workspace,
channels=loaded.config.channels,
)
fastapi_app.state.channel_runtime = channel_runtime
await channel_runtime.start()
On shutdown, stop it before or alongside cron shutdown:
runtime = getattr(fastapi_app.state, "channel_runtime", None)
if runtime is not None:
await runtime.stop()
- Step 3: Add route helpers
Add:
def get_channel_runtime(request: Request) -> ChannelRuntime:
runtime = getattr(request.app.state, "channel_runtime", None)
if runtime is None:
raise HTTPException(status_code=503, detail="Channel runtime is not running")
return runtime
Routes:
@app.get("/api/channels")
async def list_channels(request: Request) -> list[dict[str, Any]]:
return get_channel_runtime(request).statuses()
@app.get("/api/channels/{channel_id}/events")
async def list_channel_events(channel_id: str, request: Request, limit: int = 100) -> list[dict[str, Any]]:
return get_channel_runtime(request).recent_events(channel_id, limit=limit)
@app.post("/api/channels/{channel_id}/webhook")
async def post_channel_webhook(channel_id: str, request: Request) -> JSONResponse:
runtime = get_channel_runtime(request)
adapter = runtime.adapters.get(channel_id)
if adapter is None or not hasattr(adapter, "handle_webhook_payload"):
raise HTTPException(status_code=404, detail="Webhook channel not found")
payload = await request.json()
if not isinstance(payload, dict):
raise HTTPException(status_code=400, detail="Webhook payload must be a JSON object")
result = await adapter.handle_webhook_payload(payload)
status_code = 202 if result.get("pending") else 200
return JSONResponse(result, status_code=status_code)
Also support GET /api/channels/{channel_id}/webhook only if needed later for platform challenge. Generic v1 can omit GET.
- Step 4: Include runtime controls and channel status in
/api/status
Replace hard-coded:
"channels": [{"name": "web", "enabled": True}],
with:
runtime = getattr(request.app.state, "channel_runtime", None)
"channels": runtime.statuses() if runtime is not None else [],
"runtime_controls": {
"self_restart": _self_restart_enabled(),
},
- Step 5: Make
run_gateway()a compatibility wrapper
In gateway/main.py, keep public signature if practical, but route through ChannelRuntime for lifecycle. Existing tests that inject channels can build a runtime with those adapters registered or can be rewritten to target ChannelRuntime directly.
- Step 6: Verify backend route tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_gateway_channels.py tests/unit/test_channel_runtime.py -q
Expected: pass without real network or real LLM calls.
Task 9: Self-Restart API
Files:
-
Modify:
app-instance/backend/beaver/interfaces/web/app.py -
Modify:
app-instance/create-instance.sh -
Modify:
app-instance/backend/tests/unit/test_channel_runtime.py -
Step 1: Add restart status and disabled behavior tests
Add:
def test_self_restart_env_defaults_enabled(monkeypatch) -> None:
from beaver.interfaces.web.app import _self_restart_enabled
monkeypatch.delenv("BEAVER_ENABLE_SELF_RESTART", raising=False)
assert _self_restart_enabled() is True
def test_self_restart_env_can_disable(monkeypatch) -> None:
from beaver.interfaces.web.app import _self_restart_enabled
monkeypatch.setenv("BEAVER_ENABLE_SELF_RESTART", "0")
assert _self_restart_enabled() is False
- Step 2: Implement restart helpers
In web/app.py:
def _self_restart_enabled() -> bool:
return os.getenv("BEAVER_ENABLE_SELF_RESTART", "1").strip() not in {"0", "false", "False"}
def _schedule_self_restart(delay_seconds: float = 0.75) -> None:
import os as _os
import threading
import time
def _exit_later() -> None:
time.sleep(delay_seconds)
_os._exit(0)
threading.Thread(target=_exit_later, daemon=True).start()
Add route:
@app.post("/api/runtime/restart")
async def restart_runtime() -> JSONResponse:
if not _self_restart_enabled():
raise HTTPException(status_code=403, detail="Self restart is disabled")
_schedule_self_restart()
return JSONResponse({"ok": True, "restarting": True}, status_code=202)
- Step 3: Ensure created containers keep default restart enabled
In app-instance/create-instance.sh, add to RUN_ARGS:
-e "BEAVER_ENABLE_SELF_RESTART=1"
- Step 4: Verify restart helper tests
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py::test_self_restart_env_defaults_enabled tests/unit/test_channel_runtime.py::test_self_restart_env_can_disable -q
Expected: pass.
Task 10: Nginx Timeout for Channel Webhooks
Files:
-
Modify:
app-instance/nginx.conf -
Step 1: Add channel webhook location before generic
/api/
In app-instance/nginx.conf, before location /api/, add:
location /api/channels/ {
proxy_pass http://127.0.0.1:18080;
proxy_read_timeout 1800;
proxy_send_timeout 1800;
}
- Step 2: Check nginx config syntax if nginx is available
Run:
nginx -t -c /home/ivan/xuan/beaver_project/app-instance/nginx.conf
Expected: syntax ok. If nginx is not installed locally, record that this check was skipped and rely on container verification later.
Task 11: Frontend API Types and Helpers
Files:
-
Modify:
app-instance/frontend/types/index.ts -
Modify:
app-instance/frontend/lib/api.ts -
Step 1: Update types
Replace ChannelStatus with:
export interface ChannelStatus {
channel_id: string;
name?: string;
kind: string;
mode: string;
display_name: string;
enabled: boolean;
state: 'configured' | 'disabled' | 'starting' | 'running' | 'degraded' | 'error' | 'stopped';
account_id: string;
last_error?: string | null;
last_event_at?: string | null;
started_at?: string | null;
capabilities: string[];
webhook_url?: string | null;
}
export interface ChannelEventRecord {
event_id: string;
channel_id: string;
kind: string;
session_id?: string | null;
message_id?: string | null;
run_id?: string | null;
status: string;
error?: string | null;
text_preview?: string | null;
text_length?: number;
created_at: string;
metadata?: Record<string, unknown>;
}
export interface RuntimeControls {
self_restart: boolean;
}
Add to SystemStatus:
runtime_controls?: RuntimeControls;
- Step 2: Add API helpers
In app-instance/frontend/lib/api.ts:
export async function listChannelEvents(channelId: string, limit: number = 100): Promise<ChannelEventRecord[]> {
return fetchJSON(`/api/channels/${encodeURIComponent(channelId)}/events?limit=${limit}`);
}
export async function restartRuntime(): Promise<{ ok: boolean; restarting: boolean }> {
return fetchJSON('/api/runtime/restart', {
method: 'POST',
timeoutMs: 5000,
});
}
Import ChannelEventRecord.
- Step 3: Typecheck frontend
Run:
cd app-instance/frontend
npm run typecheck
Expected: pass. If this repo does not define typecheck, run npx tsc --noEmit.
Task 12: Status Page Channels and Restart UI
Files:
-
Modify:
app-instance/frontend/app/(app)/status/page.tsx -
Step 1: Add UI state
Add imports:
import { listChannelEvents, restartRuntime } from '@/lib/api';
import type { ChannelEventRecord, ChannelStatus, ProviderStatus, SystemStatus } from '@/types';
Add state:
const [selectedChannel, setSelectedChannel] = useState<ChannelStatus | null>(null);
const [channelEvents, setChannelEvents] = useState<ChannelEventRecord[]>([]);
const [loadingChannelEvents, setLoadingChannelEvents] = useState(false);
const [restartOpen, setRestartOpen] = useState(false);
const [restarting, setRestarting] = useState(false);
const [restartError, setRestartError] = useState<string | null>(null);
- Step 2: Add handlers
const openChannelDetails = async (channel: ChannelStatus) => {
setSelectedChannel(channel);
setChannelEvents([]);
setLoadingChannelEvents(true);
try {
setChannelEvents(await listChannelEvents(channel.channel_id, 20));
} catch {
setChannelEvents([]);
} finally {
setLoadingChannelEvents(false);
}
};
const handleRestart = async () => {
setRestarting(true);
setRestartError(null);
try {
await restartRuntime();
setRestartOpen(false);
window.setTimeout(() => {
void loadStatus();
}, 5000);
} catch (err: any) {
setRestartError(err.message || pickAppText(locale, '重启失败', 'Restart failed'));
} finally {
setRestarting(false);
}
};
- Step 3: Add restart button to Instance runtime card
Next to Runtime Logs:
{status.runtime_controls?.self_restart !== false ? (
<Button variant="outline" onClick={() => setRestartOpen(true)}>
<RefreshCw className="w-4 h-4 mr-2" />
{pickAppText(locale, '重启实例', 'Restart instance')}
</Button>
) : null}
- Step 4: Replace Channels card content
Replace the current chip grid with compact rows:
<div className="space-y-2">
{status.channels.length === 0 ? (
<p className="text-sm text-muted-foreground">
{pickAppText(locale, '尚未配置通道', 'No channels configured')}
</p>
) : (
status.channels.map((ch) => (
<button
key={ch.channel_id}
type="button"
onClick={() => openChannelDetails(ch)}
className="flex w-full items-center justify-between rounded-lg border px-3 py-2 text-left text-sm hover:bg-muted/40"
>
<span className="min-w-0">
<span className="block truncate font-medium">{ch.display_name || ch.channel_id}</span>
<span className="block truncate text-xs text-muted-foreground">
{ch.channel_id} · {ch.kind}/{ch.mode} · {ch.account_id}
</span>
</span>
<Badge variant={ch.state === 'running' ? 'default' : ch.state === 'error' ? 'destructive' : 'secondary'}>
{ch.state}
</Badge>
</button>
))
)}
</div>
- Step 5: Add channel details dialog
Add a dialog after the provider dialog:
<Dialog open={Boolean(selectedChannel)} onOpenChange={(open) => !open && setSelectedChannel(null)}>
<DialogContent className="sm:max-w-[720px]">
<DialogHeader>
<DialogTitle>{selectedChannel?.display_name || selectedChannel?.channel_id}</DialogTitle>
<DialogDescription>
{selectedChannel ? `${selectedChannel.kind}/${selectedChannel.mode} · ${selectedChannel.channel_id}` : ''}
</DialogDescription>
</DialogHeader>
{selectedChannel ? (
<div className="space-y-4">
<div className="grid gap-2 text-sm sm:grid-cols-2">
<InfoRow label="State" value={selectedChannel.state} />
<InfoRow label="Account" value={selectedChannel.account_id || '-'} />
<InfoRow label="Webhook" value={selectedChannel.webhook_url || '-'} />
<InfoRow label="Last error" value={selectedChannel.last_error || '-'} />
</div>
<div className="space-y-2">
<p className="text-sm font-medium">{pickAppText(locale, '最近事件', 'Recent events')}</p>
{loadingChannelEvents ? (
<Loader2 className="h-4 w-4 animate-spin text-muted-foreground" />
) : (
<div className="max-h-[320px] overflow-auto rounded-md border">
{channelEvents.map((event) => (
<div key={event.event_id} className="border-b px-3 py-2 text-xs last:border-b-0">
<div className="flex items-center justify-between gap-2">
<span className="font-medium">{event.kind}</span>
<span className="text-muted-foreground">{event.created_at}</span>
</div>
<div className="mt-1 text-muted-foreground">
{event.status}{event.error ? ` · ${event.error}` : ''}
</div>
</div>
))}
{channelEvents.length === 0 ? (
<p className="px-3 py-6 text-sm text-muted-foreground">
{pickAppText(locale, '暂无事件', 'No events yet')}
</p>
) : null}
</div>
)}
</div>
</div>
) : null}
</DialogContent>
</Dialog>
- Step 6: Add restart confirmation dialog
<Dialog open={restartOpen} onOpenChange={setRestartOpen}>
<DialogContent className="sm:max-w-[420px]">
<DialogHeader>
<DialogTitle>{pickAppText(locale, '重启实例?', 'Restart instance?')}</DialogTitle>
<DialogDescription>
{pickAppText(
locale,
'应用会短暂不可用,正在运行的对话和通道请求可能会中断。',
'The app will be unavailable briefly. Running chats and channel requests may be interrupted.'
)}
</DialogDescription>
</DialogHeader>
{restartError ? <p className="text-sm text-destructive">{restartError}</p> : null}
<DialogFooter>
<Button variant="outline" onClick={() => setRestartOpen(false)} disabled={restarting}>
{pickAppText(locale, '取消', 'Cancel')}
</Button>
<Button onClick={handleRestart} disabled={restarting}>
{restarting ? <Loader2 className="mr-2 h-4 w-4 animate-spin" /> : null}
{pickAppText(locale, '重启', 'Restart')}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
- Step 7: Verify frontend
Run:
cd app-instance/frontend
npm test -- --run
npx tsc --noEmit
Expected: pass.
Task 13: End-to-End Backend Verification
Files:
-
Modify: backend tests as needed.
-
Step 1: Run backend unit tests for channel area
Run:
cd app-instance/backend
uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_gateway_channels.py tests/unit/test_config_loader.py tests/unit/test_imports.py -q
Expected: pass.
- Step 2: Run broader backend tests impacted by AgentService/session context
Run:
cd app-instance/backend
uv run pytest tests/unit/test_agent_loop.py tests/unit/test_initial_skill_tool_hints.py tests/unit/test_marketplace_and_mcp.py -q
Expected: pass.
- Step 3: Manually verify generic webhook in local app
Start backend/frontend using the existing app-instance workflow. With config containing:
{
"channels": {
"webhook-dev": {
"enabled": true,
"kind": "webhook",
"mode": "webhook",
"accountId": "local",
"displayName": "Webhook Dev",
"config": {
"responseTimeoutSeconds": 1800
},
"secrets": {}
}
}
}
Send:
curl -sS -X POST http://127.0.0.1:18080/api/channels/webhook-dev/webhook \
-H 'Content-Type: application/json' \
-d '{"peer_id":"demo-user","thread_id":"main","message_id":"msg-001","text":"hello","peer_type":"dm"}'
Expected shape:
{
"ok": true,
"duplicate": false,
"pending": false,
"session_id": "webhook-dev:local:demo-user:main",
"reply": "..."
}
Repeat the same curl. Expected duplicate: true and cached/pending response, with no second agent execution.
- Step 4: Manually verify status UI
Open /status and confirm:
- Channel row shows
webhook-dev,webhook/webhook, accountlocal, staterunning. - Details dialog shows webhook URL and recent events.
- Restart button opens confirmation dialog.
- Confirming restart calls
/api/runtime/restartand the app comes back after container restart.
Acceptance Criteria
- All channel inbound messages, including generic sync webhook, use the strict bus-first path:
adapter -> ChannelRuntime.accept_inbound() -> MessageBus.inbound -> bridge -> AgentService.handle_inbound_message() -> MessageBus.outbound -> ChannelManager.dispatch_outbound() -> adapter.send()
- No adapter directly calls
AgentService. - No adapter directly publishes to
MessageBus. - New session ids always use
<channel_id>:<account_id>:<peer_id>[:<thread_id>]. - Missing
ChannelIdentity, missingaccount_id, or missingpeer_idrejects inbound and recordsinbound_rejected. - Runtime overwrites inconsistent inbound
session_idand recordssession_id_normalized. - Generic webhook accepts only fixed-schema text payloads.
- Generic webhook default response timeout is 1800 seconds.
- Generic webhook timeout returns HTTP 202 and does not cancel the backend run.
- Late generic webhook outbound is recorded as
outbound_unclaimed. - Dedupe cache returns pending/done/error for repeated
message_idwithout rerunning the agent. - Event log is persisted under workspace and does not store raw payloads or full conversation text.
/api/statusreturns runtime channel status and runtime controls./statusdisplays compact channel status, details events on demand, and has a confirmed restart button.POST /api/runtime/restartis enabled by default and can be disabled withBEAVER_ENABLE_SELF_RESTART=0.- AuthZ, real platform adapters, cron proactive delivery, streaming/progress, attachments, hot reload, and config UI are not implemented in v1.
V2 Backlog
- Telegram adapter.
- Slack adapter.
- WhatsApp adapter.
- Platform signature validation and later AuthZ integration if needed.
- Channel config UI with secret masking.
- Hot reload or adapter restart API.
- Cron proactive channel delivery.
- Attachment handling.
- Streaming, typing, and progress messages where platform supports them.
- Retry/replay UI for failed outbound delivery.
- Rich event filtering/search.
Execution Options
Plan complete and saved to docs/superpowers/plans/2026-06-01-channel-runtime-v1.md.
Two execution options:
- Subagent-Driven (recommended) - Dispatch a fresh subagent per task, review between tasks, fast iteration.
- Inline Execution - Execute tasks in this session using executing-plans, batch execution with checkpoints.
Which approach?