diff --git a/.gitignore b/.gitignore index 93541f0..40aa695 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ sessions/ **/.ruff_cache/ **/.mypy_cache/ **/.cache/ +**/.codegraph/ **/.venv/ **/dist/ **/build/ diff --git a/2026-06-01-hermes-gateway-llm-design.md b/2026-06-01-hermes-gateway-llm-design.md new file mode 100644 index 0000000..16749f9 --- /dev/null +++ b/2026-06-01-hermes-gateway-llm-design.md @@ -0,0 +1,177 @@ +# Hermes Gateway LLM Design + +Date: 2026-06-01 + +## Goal + +Replace the OpenAI-compatible LLM call path in `custom/custom_agent.py` with a LiveKit LLM +adapter that talks to NousResearch Hermes Agent through the OpenClaw gateway protocol. + +The integration must keep the existing custom agent behavior: + +- Chinese room-locator and general assistant instructions +- Emotion prefix parsing with `` +- Memory recall for room-locator queries +- Optional vision-frame attachment +- LiveKit ASR, TTS, VAD, turn handling, metrics, and interruption behavior + +The Hermes session strategy is `per_room`: one LiveKit room should map to one Hermes gateway +session for the lifetime of that room. + +## Non-Goals + +- Do not replace LiveKit `AgentSession`, ASR, TTS, VAD, or room I/O. +- Do not move room-locator classification into Hermes Agent. +- Do not implement Hermes-side tools in the first pass. +- Do not require an OpenAI-compatible proxy in front of the gateway. + +## Recommended Architecture + +Add a new custom LiveKit LLM implementation in `custom/hermes_gateway.py`. + +The adapter will implement the LiveKit `llm.LLM` interface and return a custom `LLMStream`. +The stream will own a single gateway request/response cycle while the LLM object owns the +per-room gateway session state. + +`custom/custom_agent.py` will continue to call `selected_llm.chat(...)` through +`_run_selected_llm()`. That preserves the existing `llm_node()` pipeline and keeps Hermes +behind the same abstraction as OpenAI-compatible models. + +## Components + +### HermesGatewayLLM + +Responsibilities: + +- Store gateway configuration: URL, auth token, agent identifier, request timeout, and reconnect + policy. +- Lazily create one Hermes gateway session per LiveKit room. +- Expose `model` as the configured Hermes agent/model identifier. +- Expose `provider` as `hermes-gateway`. +- Create `HermesGatewayLLMStream` from `chat(...)`. +- Close any persistent WebSocket/session resources in `aclose()`. + +### HermesGatewayLLMStream + +Responsibilities: + +- Serialize LiveKit `ChatContext` into the gateway request payload. +- Send the latest turn to the per-room Hermes session. +- Consume gateway events until the turn completes or fails. +- Yield LiveKit `llm.ChatChunk` objects for assistant text deltas. +- Surface recoverable connection failures through the normal LiveKit LLM error path. + +### custom_agent.py Wiring + +Add env-driven provider selection: + +- `CUSTOM_LLM_PROVIDER=openai` keeps the current behavior. +- `CUSTOM_LLM_PROVIDER=hermes_gateway` constructs `HermesGatewayLLM`. + +New Hermes-specific env vars: + +- `CUSTOM_HERMES_GATEWAY_URL` +- `CUSTOM_HERMES_API_KEY` +- `CUSTOM_HERMES_AGENT_ID` +- `CUSTOM_HERMES_SESSION_MODE=per_room` +- `CUSTOM_HERMES_REQUEST_TIMEOUT` +- `CUSTOM_HERMES_VERIFY_SSL` + +When `CUSTOM_LLM_PROVIDER=hermes_gateway`, `base_llm`, `text_llm`, and `vision_llm` should all +point at the same Hermes adapter. Separate Hermes text/vision agent IDs are out of scope for this +design. + +## Data Flow + +1. User speaks or sends text. +2. Existing LiveKit/STT flow updates `ChatContext`. +3. `CustomAgent.llm_node()` selects `general` or `room_locator`. +4. Existing code injects the appropriate instructions and emotion-prefix requirement. +5. Existing code optionally augments the latest user message with memory context. +6. Existing code optionally attaches a fresh vision frame. +7. `_run_selected_llm()` calls `HermesGatewayLLM.chat(...)`. +8. The Hermes adapter sends the request to the per-room gateway session. +9. Gateway text events are converted to `llm.ChatChunk` deltas. +10. Existing emotion observation and TTS stripping continue unchanged. + +## ChatContext Serialization + +Text messages should be serialized first. + +Supported LiveKit content: + +- `str`: send as normal message content. +- instruction/config updates: preserve the final active instructions as the leading instruction + message in the gateway payload. If the deployed gateway only accepts user/assistant messages, + prepend the instruction text to the latest user message before sending. +- image content: attempt to send through the gateway image/multimodal field. If the deployed + Hermes gateway rejects or ignores image content, log a warning and fall back to text-only + generation for that turn. + +Function tool calls should not be sent in the first implementation. If tool messages appear, log +that they were omitted. + +## per_room Session Lifecycle + +The adapter should derive a stable room key from the active LiveKit session or job context. If a +room name/SID is not available, fall back to one adapter-local session. + +For each room key: + +1. Open or reuse a gateway connection. +2. Send the gateway `connect` handshake if needed. +3. Create a Hermes session once. +4. Reuse that Hermes session for all future turns from the same room. +5. Close the gateway connection when the LiveKit LLM is closed. + +This lets Hermes maintain its own conversational state while LiveKit still keeps the visible +conversation history. + +## Gateway Event Mapping + +Map streaming text events to LiveKit chunks: + +- Gateway assistant text delta -> `llm.ChatChunk(delta=llm.ChoiceDelta(content=delta))` +- Gateway final assistant message -> emit any remaining text not already streamed +- Gateway usage metadata -> `llm.CompletionUsage` when token counts are available +- Gateway tool/action events -> log at debug/info level in the first implementation +- Gateway error event -> raise a LiveKit `APIError` or `APIConnectionError` +- Gateway completion event -> finish the async iterator + +The implementation should make the event parser tolerant of protocol field-name differences by +isolating event normalization in one helper function. Unknown event types should be logged and +ignored unless they indicate failure. + +## Error Handling + +- Missing Hermes env vars should fail fast at startup when provider is `hermes_gateway`. +- Gateway connect/session-create failures should raise connection errors. +- A failed request should not discard the per-room session unless the gateway reports that the + session is invalid or closed. +- If the gateway connection closes mid-turn, reconnect once and retry only if no assistant text + has been yielded yet. +- If assistant text has already been yielded, fail the turn instead of replaying partial output. + +## Testing + +Add focused tests around the adapter: + +- Serializes simple system/user/assistant chat context. +- Creates one gateway session and reuses it across two turns for the same room. +- Converts text deltas into `llm.ChatChunk` content. +- Handles final full-message events without duplicate text. +- Raises on gateway error events. +- Logs and skips unsupported image/tool content. + +Add a small wiring test or import-level test for `CUSTOM_LLM_PROVIDER=hermes_gateway` if the +custom module is testable without external services. + +## Rollout + +1. Implement the adapter behind `CUSTOM_LLM_PROVIDER=hermes_gateway`. +2. Keep `openai` as the default provider. +3. Run unit tests for the adapter and a syntax/type smoke check on `custom/custom_agent.py`. +4. Test manually with a local gateway using `python custom/custom_agent.py console` or the + existing LiveKit development mode. +5. If vision payloads are unsupported by the deployed gateway, document that the first Hermes + rollout is text-only for vision turns. diff --git a/docs/superpowers/plans/2026-06-01-channel-runtime-v1.md b/docs/superpowers/plans/2026-06-01-channel-runtime-v1.md new file mode 100644 index 0000000..fb2911a --- /dev/null +++ b/docs/superpowers/plans/2026-06-01-channel-runtime-v1.md @@ -0,0 +1,2356 @@ +# Channel Runtime V1 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build Beaver Channel Runtime v1: a local-configured, bus-first channel runtime with a generic text webhook reference adapter, stable channel identity/session mapping, dedupe, delivery/event logs, `/status` visibility, and a restart-instance control. + +**Architecture:** Channels are runtime integrations hosted inside the Web backend process. Every inbound message must pass through `MessageBus.inbound`, the gateway bridge, `AgentService.handle_inbound_message()`, `MessageBus.outbound`, `ChannelManager.dispatch_outbound()`, and adapter `send()`. Adapter code never calls `AgentService` directly and never publishes directly to the bus. + +**Tech Stack:** FastAPI/Python backend, dataclasses, local JSON/JSONL state under workspace, Next.js 13 app router, React, TypeScript, Vitest, pytest. + +--- + +## Locked Decisions + +- Channel v1 does not use AuthZ, OAuth, token introspection, permission scopes, pairing, allowlists, or platform signature validation. +- Channel config lives in local Beaver config under `BeaverConfig.channels`. +- All channel inbound messages run as chat/direct turns. Channel v1 does not create Tasks and does not support task prefixes. +- Session ids are generated only by the runtime from `ChannelIdentity`: + +```text +::[:] +``` + +- `channel_id`, `kind`, and `account_id` are separate concepts: + - `channel_id`: Beaver adapter instance id, for example `webhook-dev`. + - `kind`: adapter/platform kind, for example `webhook`. + - `account_id`: stable platform account/workspace/bot/phone identity. +- `ChannelIdentity.channel_id` is the canonical field. Existing `InboundMessage.channel` and `OutboundMessage.channel` remain for compatibility, but their semantic value is `channel_id`. +- Adapters do not hold `MessageBus`. They submit inbound messages through `ChannelInboundSink.accept_inbound()`. +- All enabled channels start automatically with the Web backend. Disabled channels are listed as disabled and are not instantiated. +- Config changes require backend restart in v1. No hot reload. +- `/status` gets a compact Channels section with details on demand. No independent channel configuration page in v1. +- Channel config UI is not included. Config is hand-edited in local config. +- V1 implements only a generic fixed-schema text webhook reference adapter. No Telegram, Slack, WhatsApp, email, or Outlook channel adapter in v1. +- Generic webhook waits synchronously for the final reply through the bus-first path. It defaults to a 30 minute response timeout. Timeout returns `202 accepted` and does not cancel the backend run. +- Generic webhook late outbound after request timeout is recorded as `outbound_unclaimed`. +- Dedupe result cache is persisted locally, defaults to 48 hours retention, and stores cached replies up to 20000 characters. +- Delivery failures are recorded but not retried automatically. +- Cron proactive channel delivery is not included in v1. Existing cron `deliver/channel/to` fields remain reserved. +- Streaming, typing indicators, tool progress, intermediate messages, attachment handling, multimedia, hot reload, config UI, and real platform adapters are v2 or later. +- `/status` restores a restart-instance button with confirmation. Restart uses local backend self-restart through `POST /api/runtime/restart`; it is enabled by default and can be disabled with `BEAVER_ENABLE_SELF_RESTART=0`. + +## File Structure + +- Modify: `app-instance/backend/beaver/foundation/config/schema.py` + - Add `ChannelConfig` and `channels: dict[str, ChannelConfig]`. +- Modify: `app-instance/backend/beaver/foundation/config/loader.py` + - Parse `channels` from local config. +- Modify: `app-instance/backend/tests/unit/test_config_loader.py` + - Cover channel config parsing. +- Modify: `app-instance/backend/beaver/foundation/events/message_bus.py` + - Add `ChannelIdentity`, add optional `channel_identity` to inbound/outbound messages, and add content type fields. +- Modify: `app-instance/backend/beaver/engine/context/builder.py` + - Extend `SessionContext` with normalized channel identity fields and render them into model-visible context. +- Modify: `app-instance/backend/beaver/engine/loop.py` + - Pass channel identity fields into `SessionContext`. +- Modify: `app-instance/backend/beaver/services/agent_service.py` + - Preserve `channel_identity` on outbound, pass normalized channel context to direct runs, and keep all channel runs in chat/direct mode. +- Modify: `app-instance/backend/beaver/interfaces/channels/base.py` + - Replace bus-owning adapter protocol with channel id, kind, mode, lifecycle, and outbound send. +- Modify: `app-instance/backend/beaver/interfaces/channels/manager.py` + - Dispatch by `channel.channel_id`; adapters no longer expose bus. +- Modify: `app-instance/backend/beaver/interfaces/channels/memory.py` + - Update test adapter to new identity/session model without old two-segment session ids. +- Create: `app-instance/backend/beaver/interfaces/channels/state.py` + - Persistent dedupe store and channel event log. +- Create: `app-instance/backend/beaver/interfaces/channels/generic_webhook.py` + - Fixed-schema generic text webhook reference adapter. +- Create: `app-instance/backend/beaver/interfaces/channels/runtime.py` + - `ChannelRuntime`, adapter factory, inbound bridge, outbound dispatch lifecycle, status and events API helpers. +- Modify: `app-instance/backend/beaver/interfaces/gateway/main.py` + - Turn `run_gateway()` into a compatibility wrapper around `ChannelRuntime`. +- Modify: `app-instance/backend/beaver/interfaces/web/app.py` + - Start/stop `ChannelRuntime`, add channel routes, expose channel status/events, add runtime restart API, include runtime controls in `/api/status`. +- Modify: `app-instance/nginx.conf` + - Add long proxy timeout for channel webhook API path. +- Modify: `app-instance/create-instance.sh` + - Set `BEAVER_ENABLE_SELF_RESTART=1` in created containers. +- Modify: `app-instance/frontend/types/index.ts` + - Expand channel and runtime control types. +- Modify: `app-instance/frontend/lib/api.ts` + - Add channel event and restart API helpers. +- Modify: `app-instance/frontend/app/(app)/status/page.tsx` + - Replace hard-coded channel chips with compact runtime channel table, details dialog, and restart confirmation button. +- Modify: `app-instance/backend/tests/unit/test_gateway_channels.py` + - Update existing tests to new bus-first runtime semantics and add generic webhook roundtrip coverage. +- Create: `app-instance/backend/tests/unit/test_channel_runtime.py` + - Unit tests for identity/session mapping, dedupe, event log, adapter factory, and restart guard behavior. +- Modify/Create frontend tests as needed: + - `app-instance/frontend/lib/api` and status page behavior should be covered through existing Vitest setup or a focused component test if test harness supports it. + +## Data Contracts + +### Local Config Shape + +```json +{ + "channels": { + "webhook-dev": { + "enabled": true, + "kind": "webhook", + "mode": "webhook", + "accountId": "local", + "displayName": "Webhook Dev", + "config": { + "responseTimeoutSeconds": 1800, + "dedupeRetentionHours": 48, + "maxCachedReplyChars": 20000, + "maxCachedErrorChars": 4000 + }, + "secrets": {} + } + } +} +``` + +### Generic Webhook Request + +```json +{ + "peer_id": "demo-user", + "thread_id": "main", + "message_id": "msg-001", + "text": "hello", + "peer_type": "dm", + "user_id": "optional-user" +} +``` + +`channel_id`, `kind`, and `account_id` are read from config. The request body is not trusted for those fields. + +### Generic Webhook Response + +Success: + +```json +{ + "ok": true, + "duplicate": false, + "pending": false, + "session_id": "webhook-dev:local:demo-user:main", + "run_id": "run-1", + "reply": "assistant response" +} +``` + +Processing duplicate: + +```json +{ + "ok": true, + "duplicate": true, + "pending": true, + "session_id": "webhook-dev:local:demo-user:main" +} +``` + +Completed duplicate: + +```json +{ + "ok": true, + "duplicate": true, + "pending": false, + "session_id": "webhook-dev:local:demo-user:main", + "run_id": "run-1", + "reply": "assistant response" +} +``` + +Timed out waiting for reply: + +```json +{ + "ok": true, + "duplicate": false, + "pending": true, + "session_id": "webhook-dev:local:demo-user:main" +} +``` + +### Channel Status + +```json +{ + "channel_id": "webhook-dev", + "kind": "webhook", + "mode": "webhook", + "display_name": "Webhook Dev", + "enabled": true, + "state": "running", + "account_id": "local", + "last_error": null, + "last_event_at": "2026-06-01T10:00:00Z", + "started_at": "2026-06-01T09:59:00Z", + "capabilities": ["receive_text", "send_text", "sync_webhook_response"], + "webhook_url": "/api/channels/webhook-dev/webhook" +} +``` + +Allowed states: + +```text +configured +disabled +starting +running +degraded +error +stopped +``` + +### Channel Events + +Minimum event kinds: + +```text +adapter_started +adapter_stopped +webhook_received +inbound_accepted +inbound_duplicate +inbound_rejected +session_id_normalized +direct_run_started +direct_run_finished +direct_run_failed +outbound_delivered +outbound_delivery_failed +outbound_unclaimed +webhook_response_timeout +adapter_error +``` + +Event log entries do not store full user message content or raw webhook payloads. + +```json +{ + "event_id": "evt-1", + "channel_id": "webhook-dev", + "kind": "inbound_accepted", + "session_id": "webhook-dev:local:demo-user:main", + "message_id": "msg-001", + "run_id": null, + "status": "ok", + "error": null, + "text_preview": "hello", + "text_length": 5, + "created_at": "2026-06-01T10:00:00Z" +} +``` + +## Task 1: Channel Config Schema + +**Files:** +- Modify: `app-instance/backend/beaver/foundation/config/schema.py` +- Modify: `app-instance/backend/beaver/foundation/config/loader.py` +- Modify: `app-instance/backend/tests/unit/test_config_loader.py` + +- [ ] **Step 1: Write failing config loader test** + +Add this test to `app-instance/backend/tests/unit/test_config_loader.py`: + +```python +def test_config_loader_reads_channels(tmp_path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text( + json.dumps( + { + "agents": {"defaults": {"model": "openai/gpt-5"}}, + "channels": { + "webhook-dev": { + "enabled": True, + "kind": "webhook", + "mode": "webhook", + "accountId": "local", + "displayName": "Webhook Dev", + "config": { + "responseTimeoutSeconds": 1800, + "dedupeRetentionHours": 48, + }, + "secrets": {"ignored_for_status": "secret-value"}, + } + }, + } + ), + encoding="utf-8", + ) + + config = load_config(config_path=config_path) + + channel = config.channels["webhook-dev"] + assert channel.enabled is True + assert channel.kind == "webhook" + assert channel.mode == "webhook" + assert channel.account_id == "local" + assert channel.display_name == "Webhook Dev" + assert channel.config["response_timeout_seconds"] == 1800 + assert channel.config["dedupe_retention_hours"] == 48 + assert channel.secrets == {"ignored_for_status": "secret-value"} +``` + +- [ ] **Step 2: Run failing test** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_config_loader.py::test_config_loader_reads_channels -q +``` + +Expected: fail because `BeaverConfig` has no `channels` field. + +- [ ] **Step 3: Add `ChannelConfig` to config schema** + +In `app-instance/backend/beaver/foundation/config/schema.py`, add: + +```python +@dataclass(slots=True) +class ChannelConfig: + """One configured channel adapter instance.""" + + enabled: bool = False + kind: str = "" + mode: str = "webhook" + account_id: str = "" + display_name: str = "" + config: dict[str, Any] = field(default_factory=dict) + secrets: dict[str, str] = field(default_factory=dict) +``` + +Then add this field to `BeaverConfig`: + +```python +channels: dict[str, ChannelConfig] = field(default_factory=dict) +``` + +- [ ] **Step 4: Parse channel config** + +In `app-instance/backend/beaver/foundation/config/loader.py`, add helper functions near the other config parsing helpers: + +```python +def _camel_to_snake_key(value: str) -> str: + result: list[str] = [] + for char in value: + if char.isupper() and result: + result.append("_") + result.append(char.lower()) + return "".join(result) + + +def _normalize_config_map(value: Any) -> dict[str, Any]: + if not isinstance(value, dict): + return {} + return { + _camel_to_snake_key(str(key)): item + for key, item in value.items() + if str(key).strip() + } + + +def _parse_channel_config(payload: Any) -> ChannelConfig: + if not isinstance(payload, dict): + return ChannelConfig() + secrets = payload.get("secrets", {}) + if not isinstance(secrets, dict): + secrets = {} + return ChannelConfig( + enabled=bool(payload.get("enabled", False)), + kind=_string(payload.get("kind")), + mode=_string(payload.get("mode")) or "webhook", + account_id=_string(payload.get("accountId") or payload.get("account_id")), + display_name=_string(payload.get("displayName") or payload.get("display_name")), + config=_normalize_config_map(payload.get("config")), + secrets={str(key): str(value) for key, value in secrets.items() if str(key).strip()}, + ) +``` + +Import `ChannelConfig` from schema, then wire it into `load_config()`: + +```python +channels_payload = data.get("channels", {}) +channels = { + str(channel_id): _parse_channel_config(payload) + for channel_id, payload in channels_payload.items() + if isinstance(channels_payload, dict) and str(channel_id).strip() +} +``` + +Pass `channels=channels` into `BeaverConfig(...)`. + +- [ ] **Step 5: Verify config test passes** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_config_loader.py::test_config_loader_reads_channels -q +``` + +Expected: pass. + +## Task 2: Channel Identity and Message Models + +**Files:** +- Modify: `app-instance/backend/beaver/foundation/events/message_bus.py` +- Modify: `app-instance/backend/beaver/engine/context/builder.py` +- Modify: `app-instance/backend/beaver/engine/loop.py` +- Modify: `app-instance/backend/beaver/services/agent_service.py` +- Modify: `app-instance/backend/tests/unit/test_imports.py` +- Create or update: `app-instance/backend/tests/unit/test_channel_runtime.py` + +- [ ] **Step 1: Write failing identity tests** + +Create `app-instance/backend/tests/unit/test_channel_runtime.py` with: + +```python +from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage + + +def test_channel_identity_builds_stable_session_id() -> None: + identity = ChannelIdentity( + channel_id="webhook-dev", + kind="webhook", + account_id="local", + peer_id="demo-user", + thread_id="main", + peer_type="dm", + message_id="msg-1", + ) + + assert identity.session_id() == "webhook-dev:local:demo-user:main" + assert identity.dedupe_key() == "webhook-dev:local:demo-user:main:msg-1" + + +def test_channel_identity_requires_routing_fields() -> None: + identity = ChannelIdentity(channel_id="webhook-dev", kind="webhook", account_id="", peer_id="demo") + + assert identity.validation_error() == "account_id is required" + + +def test_messages_carry_channel_identity() -> None: + identity = ChannelIdentity( + channel_id="webhook-dev", + kind="webhook", + account_id="local", + peer_id="demo-user", + message_id="msg-1", + ) + + inbound = InboundMessage(channel="webhook-dev", content="hello", channel_identity=identity) + outbound = OutboundMessage( + channel="webhook-dev", + content="ok", + session_id=identity.session_id(), + finish_reason="stop", + channel_identity=identity, + ) + + assert inbound.channel_identity is identity + assert outbound.channel_identity is identity +``` + +- [ ] **Step 2: Run failing tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_channel_identity_builds_stable_session_id tests/unit/test_channel_runtime.py::test_channel_identity_requires_routing_fields tests/unit/test_channel_runtime.py::test_messages_carry_channel_identity -q +``` + +Expected: fail because `ChannelIdentity` does not exist. + +- [ ] **Step 3: Implement identity model** + +In `app-instance/backend/beaver/foundation/events/message_bus.py`, add: + +```python +@dataclass(slots=True) +class ChannelIdentity: + """Normalized channel routing identity. + + channel_id is the Beaver adapter instance id, not the platform kind. + """ + + channel_id: str + kind: str + account_id: str + peer_id: str + thread_id: str | None = None + peer_type: str = "unknown" + user_id: str | None = None + message_id: str | None = None + + def validation_error(self) -> str | None: + if not self.channel_id.strip(): + return "channel_id is required" + if not self.account_id.strip(): + return "account_id is required" + if not self.peer_id.strip(): + return "peer_id is required" + return None + + def session_id(self) -> str: + parts = [self.channel_id, self.account_id, self.peer_id] + if self.thread_id: + parts.append(self.thread_id) + return ":".join(_clean_session_part(part) for part in parts) + + def dedupe_key(self) -> str | None: + if not self.message_id: + return None + return f"{self.session_id()}:{_clean_session_part(self.message_id)}" + + +def _clean_session_part(value: str) -> str: + cleaned = str(value).strip() + if not cleaned: + return "unknown" + return cleaned.replace(":", "_") +``` + +Add fields to `InboundMessage`: + +```python +content_type: str = "text" +channel_identity: ChannelIdentity | None = None +``` + +Add fields to `OutboundMessage`: + +```python +content_type: str = "text" +channel_identity: ChannelIdentity | None = None +``` + +Update `app-instance/backend/beaver/foundation/events/__init__.py` to export `ChannelIdentity`. + +- [ ] **Step 4: Preserve identity in AgentService outbound** + +In `AgentService.build_outbound_message()`, add: + +```python +channel_identity=inbound.channel_identity, +content_type=inbound.content_type, +``` + +In `AgentService.build_outbound_error()`, add the same two fields. + +- [ ] **Step 5: Pass channel context into direct runs** + +In `AgentService.handle_inbound_message()`, pass normalized channel fields: + +```python +channel_identity = inbound.channel_identity +result = await self.submit_direct( + inbound.content, + session_id=inbound.session_id, + source=f"gateway:{inbound.channel}", + user_id=inbound.user_id or (channel_identity.user_id if channel_identity else None), + title=inbound.title, + execution_context=inbound.execution_context, + model=inbound.model, + provider_name=inbound.provider_name, + embedding_model=inbound.embedding_model, + channel_identity=channel_identity, +) +``` + +In `AgentLoop._process_direct_impl()`, add a keyword parameter: + +```python +channel_identity: ChannelIdentity | None = None, +``` + +Import `ChannelIdentity`. When constructing `SessionContext`, include: + +```python +channel=channel_identity.channel_id if channel_identity else None, +channel_kind=channel_identity.kind if channel_identity else None, +account_id=channel_identity.account_id if channel_identity else None, +peer_id=channel_identity.peer_id if channel_identity else None, +peer_type=channel_identity.peer_type if channel_identity else None, +chat_id=channel_identity.peer_id if channel_identity else None, +thread_id=channel_identity.thread_id if channel_identity else None, +``` + +- [ ] **Step 6: Extend `SessionContext` rendering** + +In `app-instance/backend/beaver/engine/context/builder.py`, add fields: + +```python +channel_kind: str | None = None +account_id: str | None = None +peer_id: str | None = None +peer_type: str | None = None +thread_id: str | None = None +``` + +In `_render_session_section()`, add: + +```python +if session_context.channel_kind: + rows.append(f"Channel Kind: {session_context.channel_kind}") +if session_context.account_id: + rows.append(f"Account ID: {session_context.account_id}") +if session_context.peer_id: + rows.append(f"Peer ID: {session_context.peer_id}") +if session_context.peer_type: + rows.append(f"Peer Type: {session_context.peer_type}") +if session_context.thread_id: + rows.append(f"Thread ID: {session_context.thread_id}") +``` + +Do not render `raw_channel_payload`. + +- [ ] **Step 7: Verify identity tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_imports.py -q +``` + +Expected: pass after updating import assertions if needed. + +## Task 3: Dedupe Store and Event Log + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/state.py` +- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py` + +- [ ] **Step 1: Add failing state tests** + +Append to `test_channel_runtime.py`: + +```python +from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog + + +def test_dedupe_store_tracks_processing_and_done(tmp_path) -> None: + store = ChannelDedupeStore(tmp_path / "dedupe.json", retention_hours=48) + + created = store.mark_processing( + dedupe_key="webhook-dev:local:demo:msg-1", + session_id="webhook-dev:local:demo", + message_id="msg-1", + ) + duplicate = store.mark_processing( + dedupe_key="webhook-dev:local:demo:msg-1", + session_id="webhook-dev:local:demo", + message_id="msg-1", + ) + + assert created.created is True + assert duplicate.created is False + assert duplicate.record is not None + assert duplicate.record["status"] == "processing" + + store.mark_done( + dedupe_key="webhook-dev:local:demo:msg-1", + run_id="run-1", + reply="hello" * 10000, + max_reply_chars=20, + ) + + done = store.get("webhook-dev:local:demo:msg-1") + assert done is not None + assert done["status"] == "done" + assert done["reply"] == "hellohellohellohello" + + +def test_channel_event_log_writes_recent_events(tmp_path) -> None: + log = ChannelEventLog(tmp_path / "events.jsonl") + log.record( + channel_id="webhook-dev", + kind="inbound_accepted", + session_id="webhook-dev:local:demo", + message_id="msg-1", + status="ok", + text="hello world", + ) + + events = log.recent(channel_id="webhook-dev", limit=10) + + assert len(events) == 1 + assert events[0]["kind"] == "inbound_accepted" + assert events[0]["text_preview"] == "hello world" + assert "raw_channel_payload" not in events[0] +``` + +- [ ] **Step 2: Run failing state tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_dedupe_store_tracks_processing_and_done tests/unit/test_channel_runtime.py::test_channel_event_log_writes_recent_events -q +``` + +Expected: fail because `state.py` does not exist. + +- [ ] **Step 3: Implement `state.py`** + +Create `app-instance/backend/beaver/interfaces/channels/state.py` with: + +```python +from __future__ import annotations + +import json +import time +from dataclasses import dataclass +from pathlib import Path +from threading import Lock +from typing import Any +from uuid import uuid4 + + +def _now_ms() -> int: + return int(time.time() * 1000) + + +def _iso_now() -> str: + from datetime import datetime, timezone + + return datetime.now(timezone.utc).isoformat() + + +@dataclass(slots=True) +class DedupeWriteResult: + created: bool + record: dict[str, Any] | None = None + + +class ChannelDedupeStore: + def __init__(self, path: Path, *, retention_hours: int = 48) -> None: + self.path = path + self.retention_ms = max(1, int(retention_hours)) * 60 * 60 * 1000 + self._lock = Lock() + + def _load(self) -> dict[str, Any]: + if not self.path.exists(): + return {"records": {}} + try: + data = json.loads(self.path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {"records": {}} + return data if isinstance(data, dict) and isinstance(data.get("records"), dict) else {"records": {}} + + def _save(self, data: dict[str, Any]) -> None: + self.path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = self.path.with_name(f"{self.path.name}.tmp") + tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + tmp_path.replace(self.path) + + def _prune_unlocked(self, data: dict[str, Any], now_ms: int) -> None: + records = data.get("records", {}) + expired_before = now_ms - self.retention_ms + for key, record in list(records.items()): + updated_at_ms = int(record.get("updated_at_ms") or record.get("created_at_ms") or 0) + if updated_at_ms < expired_before: + records.pop(key, None) + + def get(self, dedupe_key: str) -> dict[str, Any] | None: + with self._lock: + data = self._load() + self._prune_unlocked(data, _now_ms()) + return data["records"].get(dedupe_key) + + def mark_processing(self, *, dedupe_key: str, session_id: str, message_id: str) -> DedupeWriteResult: + with self._lock: + data = self._load() + now_ms = _now_ms() + self._prune_unlocked(data, now_ms) + existing = data["records"].get(dedupe_key) + if existing is not None: + self._save(data) + return DedupeWriteResult(created=False, record=existing) + record = { + "dedupe_key": dedupe_key, + "status": "processing", + "session_id": session_id, + "message_id": message_id, + "run_id": None, + "reply": None, + "error": None, + "created_at_ms": now_ms, + "updated_at_ms": now_ms, + } + data["records"][dedupe_key] = record + self._save(data) + return DedupeWriteResult(created=True, record=record) + + def mark_done(self, *, dedupe_key: str, run_id: str | None, reply: str, max_reply_chars: int) -> None: + self._mark_result( + dedupe_key=dedupe_key, + status="done", + run_id=run_id, + reply=reply[: max(0, int(max_reply_chars))], + error=None, + ) + + def mark_error(self, *, dedupe_key: str, error: str, max_error_chars: int) -> None: + self._mark_result( + dedupe_key=dedupe_key, + status="error", + run_id=None, + reply=None, + error=error[: max(0, int(max_error_chars))], + ) + + def _mark_result( + self, + *, + dedupe_key: str, + status: str, + run_id: str | None, + reply: str | None, + error: str | None, + ) -> None: + with self._lock: + data = self._load() + record = data["records"].get(dedupe_key) + if record is None: + record = {"dedupe_key": dedupe_key, "created_at_ms": _now_ms()} + data["records"][dedupe_key] = record + record.update( + { + "status": status, + "run_id": run_id, + "reply": reply, + "error": error, + "updated_at_ms": _now_ms(), + } + ) + self._save(data) + + +class ChannelEventLog: + def __init__(self, path: Path) -> None: + self.path = path + self._lock = Lock() + + def record( + self, + *, + channel_id: str, + kind: str, + session_id: str | None = None, + message_id: str | None = None, + run_id: str | None = None, + status: str = "ok", + error: str | None = None, + text: str | None = None, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + entry = { + "event_id": uuid4().hex, + "channel_id": channel_id, + "kind": kind, + "session_id": session_id, + "message_id": message_id, + "run_id": run_id, + "status": status, + "error": error, + "text_preview": (text or "")[:120] if text else None, + "text_length": len(text) if text else 0, + "metadata": metadata or {}, + "created_at": _iso_now(), + } + with self._lock: + self.path.parent.mkdir(parents=True, exist_ok=True) + with self.path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(entry, ensure_ascii=False) + "\n") + return entry + + def recent(self, *, channel_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]: + if not self.path.exists(): + return [] + lines = self.path.read_text(encoding="utf-8").splitlines() + items: list[dict[str, Any]] = [] + for line in reversed(lines): + try: + item = json.loads(line) + except json.JSONDecodeError: + continue + if channel_id and item.get("channel_id") != channel_id: + continue + items.append(item) + if len(items) >= max(1, int(limit)): + break + return list(reversed(items)) +``` + +- [ ] **Step 4: Verify state tests pass** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_dedupe_store_tracks_processing_and_done tests/unit/test_channel_runtime.py::test_channel_event_log_writes_recent_events -q +``` + +Expected: pass. + +## Task 4: Adapter Protocol and ChannelManager Migration + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/channels/base.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/manager.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/memory.py` +- Modify: `app-instance/backend/tests/unit/test_gateway_channels.py` +- Modify: `app-instance/backend/tests/unit/test_imports.py` + +- [ ] **Step 1: Write failing manager tests for new protocol** + +Update the manager-focused parts of `test_gateway_channels.py` to use `channel_id` instead of `name` and no adapter bus: + +```python +def test_channel_manager_dispatches_by_channel_id() -> None: + class CaptureChannel: + channel_id = "webhook-dev" + kind = "webhook" + mode = "webhook" + + def __init__(self) -> None: + self.sent = [] + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def send(self, message: Any) -> None: + self.sent.append(message) + + async def run() -> None: + bus = MessageBus() + channel = CaptureChannel() + manager = ChannelManager(bus) + manager.register(channel) + await bus.publish_outbound( + OutboundMessage( + channel="webhook-dev", + content="ok", + session_id="webhook-dev:local:demo", + finish_reason="stop", + ) + ) + stop_event = asyncio.Event() + stop_event.set() + + await manager.dispatch_outbound(stop_event) + + assert channel.sent[0].content == "ok" + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run failing manager test** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_gateway_channels.py::test_channel_manager_dispatches_by_channel_id -q +``` + +Expected: fail because current protocol uses `name` and bus checks. + +- [ ] **Step 3: Update adapter protocol** + +Replace `ChannelAdapter` in `base.py` with: + +```python +class ChannelAdapter(Protocol): + """Minimal contract every runtime channel adapter must implement.""" + + channel_id: str + kind: str + mode: str + + async def start(self) -> None: + """Prepare the channel before messages are routed.""" + + async def stop(self) -> None: + """Stop accepting/routing channel messages.""" + + async def send(self, message: OutboundMessage) -> None: + """Deliver an outbound message to the concrete channel.""" + + +class ChannelInboundSink(Protocol): + async def accept_inbound(self, message: InboundMessage) -> Any: + """Accept a normalized inbound message from an adapter.""" +``` + +Import `InboundMessage`. + +- [ ] **Step 4: Update ChannelManager** + +In `manager.py`: + +```python +def register(self, channel: ChannelAdapter) -> None: + if self.started: + raise RuntimeError("Cannot register channels after ChannelManager.start()") + if channel.channel_id in self.channels: + raise ValueError(f"Channel already registered: {channel.channel_id}") + self.channels[channel.channel_id] = channel +``` + +The rest of dispatch remains keyed by `message.channel`. + +- [ ] **Step 5: Migrate memory adapter to new protocol** + +Change `MemoryChannelAdapter.__init__` signature: + +```python +def __init__( + self, + inbound_sink: ChannelInboundSink, + *, + channel_id: str = "memory-dev", + kind: str = "memory", + mode: str = "webhook", + account_id: str = "memory", +) -> None: + self.channel_id = channel_id + self.kind = kind + self.mode = mode + self.account_id = account_id + self.inbound_sink = inbound_sink + self.started = False + self.sent_messages: list[OutboundMessage] = [] +``` + +Update `publish_external_text()` to build `ChannelIdentity` and call `self.inbound_sink.accept_inbound(message)`. + +- [ ] **Step 6: Verify existing gateway tests after migration** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_gateway_channels.py tests/unit/test_imports.py -q +``` + +Expected: some old tests will need assertion updates to new session id format. Update all expected session ids to `::[:]` and remove old `telegram:chat-1` expectations. + +## Task 5: ChannelRuntime Core + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/runtime.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/__init__.py` +- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py` + +- [ ] **Step 1: Add failing runtime tests** + +Append: + +```python +from beaver.foundation.config.schema import ChannelConfig +from beaver.foundation.events import MessageBus +from beaver.interfaces.channels.runtime import ChannelRuntime + + +class FakeAgentService: + is_running = True + + async def handle_inbound_message(self, inbound): + return OutboundMessage( + message_id=inbound.message_id, + channel=inbound.channel, + content=f"echo:{inbound.content}", + session_id=inbound.session_id, + finish_reason="stop", + channel_identity=inbound.channel_identity, + ) + + +def test_channel_runtime_accept_inbound_normalizes_session_and_dedupes(tmp_path) -> None: + async def run() -> None: + bus = MessageBus() + runtime = ChannelRuntime( + service=FakeAgentService(), + bus=bus, + workspace=tmp_path, + channels={}, + ) + identity = ChannelIdentity( + channel_id="webhook-dev", + kind="webhook", + account_id="local", + peer_id="demo", + message_id="msg-1", + ) + result = await runtime.accept_inbound( + InboundMessage( + channel="webhook-dev", + content="hello", + session_id="wrong", + channel_identity=identity, + ) + ) + duplicate = await runtime.accept_inbound( + InboundMessage( + channel="webhook-dev", + content="hello", + channel_identity=identity, + ) + ) + + queued = await bus.consume_inbound() + assert result.accepted is True + assert queued.session_id == "webhook-dev:local:demo" + assert duplicate.accepted is False + assert duplicate.duplicate is True + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run failing runtime test** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_channel_runtime_accept_inbound_normalizes_session_and_dedupes -q +``` + +Expected: fail because `ChannelRuntime` does not exist. + +- [ ] **Step 3: Implement runtime result models and constructor** + +Create `runtime.py` with: + +```python +from __future__ import annotations + +import asyncio +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from beaver.foundation.config.schema import ChannelConfig +from beaver.foundation.events import InboundMessage, MessageBus +from beaver.interfaces.channels import ChannelAdapter, ChannelManager +from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog +from beaver.services.agent_service import AgentService + + +@dataclass(slots=True) +class ChannelAcceptResult: + accepted: bool + duplicate: bool = False + pending: bool = False + rejected: bool = False + session_id: str | None = None + dedupe_key: str | None = None + record: dict[str, Any] | None = None + error: str | None = None + + +class ChannelRuntime: + def __init__( + self, + *, + service: AgentService, + workspace: Path, + channels: dict[str, ChannelConfig], + bus: MessageBus | None = None, + ) -> None: + self.service = service + self.workspace = workspace + self.bus = bus or MessageBus() + self.manager = ChannelManager(self.bus) + self.channel_configs = dict(channels) + self.adapters: dict[str, ChannelAdapter] = {} + self.states: dict[str, dict[str, Any]] = {} + state_dir = workspace / "state" / "channels" + self.dedupe = ChannelDedupeStore(state_dir / "dedupe.json") + self.events = ChannelEventLog(state_dir / "events.jsonl") + self._bridge_task: asyncio.Task[None] | None = None + self._dispatch_task: asyncio.Task[None] | None = None + self._stop_event = asyncio.Event() + self._dispatch_stop_event = asyncio.Event() +``` + +- [ ] **Step 4: Implement `accept_inbound()`** + +Add: + +```python + async def accept_inbound(self, message: InboundMessage) -> ChannelAcceptResult: + identity = message.channel_identity + if identity is None: + self.events.record(channel_id=message.channel, kind="inbound_rejected", status="error", error="channel_identity is required") + return ChannelAcceptResult(accepted=False, rejected=True, error="channel_identity is required") + validation_error = identity.validation_error() + if validation_error: + self.events.record(channel_id=identity.channel_id, kind="inbound_rejected", status="error", error=validation_error) + return ChannelAcceptResult(accepted=False, rejected=True, error=validation_error) + + expected_session_id = identity.session_id() + if message.session_id != expected_session_id: + self.events.record( + channel_id=identity.channel_id, + kind="session_id_normalized", + session_id=expected_session_id, + message_id=identity.message_id, + status="ok", + ) + message.session_id = expected_session_id + message.channel = identity.channel_id + + dedupe_key = identity.dedupe_key() + if dedupe_key: + write = self.dedupe.mark_processing( + dedupe_key=dedupe_key, + session_id=expected_session_id, + message_id=identity.message_id or "", + ) + if not write.created: + record = write.record or {} + self.events.record( + channel_id=identity.channel_id, + kind="inbound_duplicate", + session_id=expected_session_id, + message_id=identity.message_id, + status=str(record.get("status") or "processing"), + ) + return ChannelAcceptResult( + accepted=False, + duplicate=True, + pending=record.get("status") == "processing", + session_id=expected_session_id, + dedupe_key=dedupe_key, + record=record, + ) + + self.events.record( + channel_id=identity.channel_id, + kind="inbound_accepted", + session_id=expected_session_id, + message_id=identity.message_id, + status="ok", + text=message.content, + ) + await self.bus.publish_inbound(message) + return ChannelAcceptResult( + accepted=True, + session_id=expected_session_id, + dedupe_key=dedupe_key, + ) +``` + +- [ ] **Step 5: Implement bridge and dispatch lifecycle** + +Add: + +```python + async def start(self) -> None: + self._stop_event.clear() + self._dispatch_stop_event.clear() + self._bridge_task = asyncio.create_task(self._bridge_inbound_to_agent()) + self._dispatch_task = asyncio.create_task(self.manager.dispatch_outbound(self._dispatch_stop_event)) + + async def stop(self) -> None: + self._stop_event.set() + self._dispatch_stop_event.set() + for task in (self._bridge_task, self._dispatch_task): + if task is None: + continue + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + await self.manager.stop() + + async def _bridge_inbound_to_agent(self) -> None: + while not self._stop_event.is_set(): + try: + inbound = await asyncio.wait_for(self.bus.consume_inbound(), timeout=0.25) + except asyncio.TimeoutError: + continue + identity = inbound.channel_identity + try: + self.events.record( + channel_id=inbound.channel, + kind="direct_run_started", + session_id=inbound.session_id, + message_id=identity.message_id if identity else inbound.message_id, + ) + outbound = await self.service.handle_inbound_message(inbound) + except Exception as exc: + self.events.record( + channel_id=inbound.channel, + kind="direct_run_failed", + session_id=inbound.session_id, + message_id=identity.message_id if identity else inbound.message_id, + status="error", + error=str(exc), + ) + outbound = AgentService.build_outbound_error(inbound, detail=str(exc), finish_reason="error") + else: + self.events.record( + channel_id=outbound.channel, + kind="direct_run_finished", + session_id=outbound.session_id, + message_id=identity.message_id if identity else inbound.message_id, + run_id=outbound.run_id, + ) + await self.bus.publish_outbound(outbound) +``` + +- [ ] **Step 6: Verify runtime tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py -q +``` + +Expected: pass for identity, state, and basic runtime tests. + +## Task 6: Generic Webhook Reference Adapter + +**Files:** +- Create: `app-instance/backend/beaver/interfaces/channels/generic_webhook.py` +- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py` +- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py` + +- [ ] **Step 1: Add failing generic webhook roundtrip test** + +Append: + +```python +from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter + + +def test_generic_webhook_adapter_waits_for_outbound_reply(tmp_path) -> None: + async def run() -> None: + bus = MessageBus() + runtime = ChannelRuntime( + service=FakeAgentService(), + bus=bus, + workspace=tmp_path, + channels={}, + ) + adapter = GenericWebhookAdapter( + channel_id="webhook-dev", + kind="webhook", + mode="webhook", + account_id="local", + display_name="Webhook Dev", + inbound_sink=runtime, + response_timeout_seconds=1, + ) + runtime.manager.register(adapter) + await runtime.start() + try: + response = await adapter.handle_webhook_payload( + { + "peer_id": "demo", + "message_id": "msg-1", + "text": "hello", + "peer_type": "dm", + } + ) + finally: + await runtime.stop() + + assert response["ok"] is True + assert response["reply"] == "echo:hello" + assert response["session_id"] == "webhook-dev:local:demo" + + asyncio.run(run()) +``` + +- [ ] **Step 2: Run failing adapter test** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_generic_webhook_adapter_waits_for_outbound_reply -q +``` + +Expected: fail because generic adapter does not exist. + +- [ ] **Step 3: Implement generic adapter** + +Create `generic_webhook.py`: + +```python +from __future__ import annotations + +import asyncio +from typing import Any + +from beaver.foundation.events import ChannelIdentity, InboundMessage, OutboundMessage +from beaver.interfaces.channels.base import ChannelInboundSink + + +class GenericWebhookAdapter: + def __init__( + self, + *, + channel_id: str, + kind: str, + mode: str, + account_id: str, + display_name: str = "", + inbound_sink: ChannelInboundSink, + response_timeout_seconds: float = 1800, + ) -> None: + self.channel_id = channel_id + self.kind = kind + self.mode = mode + self.account_id = account_id + self.display_name = display_name or channel_id + self.inbound_sink = inbound_sink + self.response_timeout_seconds = max(1.0, float(response_timeout_seconds)) + self.started = False + self._pending: dict[str, asyncio.Future[OutboundMessage]] = {} + + async def start(self) -> None: + self.started = True + + async def stop(self) -> None: + self.started = False + for future in list(self._pending.values()): + if not future.done(): + future.cancel() + self._pending.clear() + + async def handle_webhook_payload(self, payload: dict[str, Any]) -> dict[str, Any]: + text = str(payload.get("text") or "").strip() + peer_id = str(payload.get("peer_id") or "").strip() + message_id = str(payload.get("message_id") or "").strip() + thread_id = str(payload.get("thread_id") or "").strip() or None + peer_type = str(payload.get("peer_type") or "unknown").strip() or "unknown" + user_id = str(payload.get("user_id") or "").strip() or None + if not text: + return {"ok": False, "error": "text is required"} + if not peer_id: + return {"ok": False, "error": "peer_id is required"} + if not message_id: + return {"ok": False, "error": "message_id is required"} + + identity = ChannelIdentity( + channel_id=self.channel_id, + kind=self.kind, + account_id=self.account_id, + peer_id=peer_id, + thread_id=thread_id, + peer_type=peer_type, + user_id=user_id, + message_id=message_id, + ) + inbound = InboundMessage( + channel=self.channel_id, + content=text, + user_id=user_id, + channel_identity=identity, + metadata={"webhook": {"peer_type": peer_type}}, + ) + future = asyncio.get_running_loop().create_future() + self._pending[inbound.message_id] = future + accept = await self.inbound_sink.accept_inbound(inbound) + if not accept.accepted: + self._pending.pop(inbound.message_id, None) + record = accept.record or {} + return { + "ok": accept.error is None, + "duplicate": accept.duplicate, + "pending": accept.pending, + "session_id": accept.session_id, + "status": record.get("status"), + "run_id": record.get("run_id"), + "reply": record.get("reply"), + "error": accept.error or record.get("error"), + } + try: + outbound = await asyncio.wait_for(future, timeout=self.response_timeout_seconds) + except asyncio.TimeoutError: + self._pending.pop(inbound.message_id, None) + return { + "ok": True, + "duplicate": False, + "pending": True, + "session_id": accept.session_id, + } + return { + "ok": outbound.finish_reason != "error", + "duplicate": False, + "pending": False, + "session_id": outbound.session_id, + "run_id": outbound.run_id, + "reply": outbound.content, + "error": outbound.metadata.get("error"), + } + + async def send(self, message: OutboundMessage) -> None: + future = self._pending.pop(message.message_id, None) + if future is None or future.done(): + return + future.set_result(message) +``` + +- [ ] **Step 4: Mark dedupe done/error from runtime dispatch** + +In `ChannelRuntime._bridge_inbound_to_agent()`, after outbound is created, if inbound has `dedupe_key`, mark done/error: + +```python +dedupe_key = identity.dedupe_key() if identity else None +if dedupe_key: + if outbound.finish_reason == "error": + self.dedupe.mark_error( + dedupe_key=dedupe_key, + error=outbound.content, + max_error_chars=4000, + ) + else: + self.dedupe.mark_done( + dedupe_key=dedupe_key, + run_id=outbound.run_id, + reply=outbound.content, + max_reply_chars=20000, + ) +``` + +- [ ] **Step 5: Record delivery events in ChannelManager or runtime wrapper** + +Wrap adapter `send()` in `ChannelManager.dispatch_outbound()` or introduce a runtime-managed dispatch callback. V1 minimal implementation can keep the manager catch and append `undeliverable`, but `ChannelRuntime` must expose delivery events. If manager remains event-unaware, add optional callback: + +```python +async def dispatch_outbound( + self, + stop_event: asyncio.Event, + on_delivered: Callable[[OutboundMessage], Awaitable[None]] | None = None, + on_failed: Callable[[OutboundMessage, Exception | None], Awaitable[None]] | None = None, +) -> None: +``` + +Use callbacks in `ChannelRuntime.start()` to record `outbound_delivered` and `outbound_delivery_failed`. + +- [ ] **Step 6: Verify generic webhook roundtrip** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_generic_webhook_adapter_waits_for_outbound_reply -q +``` + +Expected: pass. + +## Task 7: Runtime Adapter Factory and Status + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/channels/runtime.py` +- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py` + +- [ ] **Step 1: Add failing status/factory test** + +Append: + +```python +def test_channel_runtime_starts_enabled_generic_webhook_and_reports_status(tmp_path) -> None: + async def run() -> None: + runtime = ChannelRuntime( + service=FakeAgentService(), + workspace=tmp_path, + channels={ + "webhook-dev": ChannelConfig( + enabled=True, + kind="webhook", + mode="webhook", + account_id="local", + display_name="Webhook Dev", + config={"response_timeout_seconds": 1800}, + ), + "off": ChannelConfig( + enabled=False, + kind="webhook", + mode="webhook", + account_id="local", + ), + }, + ) + await runtime.start() + try: + statuses = runtime.statuses() + finally: + await runtime.stop() + + by_id = {item["channel_id"]: item for item in statuses} + assert by_id["webhook-dev"]["state"] == "running" + assert by_id["webhook-dev"]["webhook_url"] == "/api/channels/webhook-dev/webhook" + assert by_id["off"]["state"] == "disabled" + + asyncio.run(run()) +``` + +- [ ] **Step 2: Implement adapter creation** + +In `ChannelRuntime.start()`, before starting bridge/dispatch tasks: + +```python +for channel_id, cfg in self.channel_configs.items(): + if not cfg.enabled: + self.states[channel_id] = {"state": "disabled", "last_error": None} + continue + try: + adapter = self._build_adapter(channel_id, cfg) + self.adapters[channel_id] = adapter + self.manager.register(adapter) + await adapter.start() + self.states[channel_id] = {"state": "running", "last_error": None, "started_at": _iso_now()} + self.events.record(channel_id=channel_id, kind="adapter_started") + except Exception as exc: + self.states[channel_id] = {"state": "error", "last_error": str(exc)} + self.events.record(channel_id=channel_id, kind="adapter_error", status="error", error=str(exc)) +``` + +Add `_build_adapter()`: + +```python + def _build_adapter(self, channel_id: str, cfg: ChannelConfig) -> ChannelAdapter: + if cfg.kind != "webhook" or cfg.mode != "webhook": + raise ValueError(f"Unsupported channel kind/mode: {cfg.kind}/{cfg.mode}") + from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter + + return GenericWebhookAdapter( + channel_id=channel_id, + kind=cfg.kind, + mode=cfg.mode, + account_id=cfg.account_id, + display_name=cfg.display_name, + inbound_sink=self, + response_timeout_seconds=float(cfg.config.get("response_timeout_seconds") or 1800), + ) +``` + +- [ ] **Step 3: Implement status helpers** + +Add: + +```python + def statuses(self) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + recent = self.events.recent(limit=500) + last_by_channel = {event["channel_id"]: event for event in recent if event.get("channel_id")} + for channel_id, cfg in self.channel_configs.items(): + state = self.states.get(channel_id, {"state": "configured", "last_error": None}) + items.append( + { + "channel_id": channel_id, + "name": channel_id, + "kind": cfg.kind, + "mode": cfg.mode, + "display_name": cfg.display_name or channel_id, + "enabled": cfg.enabled, + "state": state.get("state", "configured"), + "account_id": cfg.account_id, + "last_error": state.get("last_error"), + "started_at": state.get("started_at"), + "last_event_at": last_by_channel.get(channel_id, {}).get("created_at"), + "capabilities": ["receive_text", "send_text", "sync_webhook_response"] + if cfg.kind == "webhook" + else [], + "webhook_url": f"/api/channels/{channel_id}/webhook" if cfg.kind == "webhook" else None, + } + ) + return items + + def recent_events(self, channel_id: str, *, limit: int = 100) -> list[dict[str, Any]]: + return self.events.recent(channel_id=channel_id, limit=limit) +``` + +- [ ] **Step 4: Verify status/factory tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_channel_runtime_starts_enabled_generic_webhook_and_reports_status -q +``` + +Expected: pass. + +## Task 8: Web Backend Integration and Routes + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/web/app.py` +- Modify: `app-instance/backend/beaver/interfaces/gateway/main.py` +- Modify: `app-instance/backend/tests/unit/test_gateway_channels.py` + +- [ ] **Step 1: Add failing Web API route test** + +Add to `test_gateway_channels.py` or a focused web app test file: + +```python +def test_web_app_exposes_channel_status_and_webhook(tmp_path) -> None: + from fastapi.testclient import TestClient + from beaver.foundation.config.loader import load_config + from beaver.interfaces.web.app import create_app + + config_path = tmp_path / "config.json" + workspace = tmp_path / "workspace" + workspace.mkdir() + config_path.write_text( + json.dumps( + { + "agents": {"defaults": {"model": "openai/gpt-5"}}, + "providers": {}, + "channels": { + "webhook-dev": { + "enabled": True, + "kind": "webhook", + "mode": "webhook", + "accountId": "local", + } + }, + } + ), + encoding="utf-8", + ) + + app = create_app(workspace=workspace, config_path=config_path) + with TestClient(app) as client: + status = client.get("/api/status").json() + assert status["channels"][0]["channel_id"] == "webhook-dev" + assert status["channels"][0]["state"] == "running" +``` + +Use fake provider/service injection if the default app startup requires external provider calls. The test should not call a real LLM. + +- [ ] **Step 2: Start ChannelRuntime in lifespan** + +In `_app_lifespan`, after `AgentService.start()` and config boot, create and start runtime: + +```python +loaded = attached_service.create_loop().boot() +channel_runtime = ChannelRuntime( + service=attached_service, + workspace=loaded.workspace, + channels=loaded.config.channels, +) +fastapi_app.state.channel_runtime = channel_runtime +await channel_runtime.start() +``` + +On shutdown, stop it before or alongside cron shutdown: + +```python +runtime = getattr(fastapi_app.state, "channel_runtime", None) +if runtime is not None: + await runtime.stop() +``` + +- [ ] **Step 3: Add route helpers** + +Add: + +```python +def get_channel_runtime(request: Request) -> ChannelRuntime: + runtime = getattr(request.app.state, "channel_runtime", None) + if runtime is None: + raise HTTPException(status_code=503, detail="Channel runtime is not running") + return runtime +``` + +Routes: + +```python +@app.get("/api/channels") +async def list_channels(request: Request) -> list[dict[str, Any]]: + return get_channel_runtime(request).statuses() + + +@app.get("/api/channels/{channel_id}/events") +async def list_channel_events(channel_id: str, request: Request, limit: int = 100) -> list[dict[str, Any]]: + return get_channel_runtime(request).recent_events(channel_id, limit=limit) + + +@app.post("/api/channels/{channel_id}/webhook") +async def post_channel_webhook(channel_id: str, request: Request) -> JSONResponse: + runtime = get_channel_runtime(request) + adapter = runtime.adapters.get(channel_id) + if adapter is None or not hasattr(adapter, "handle_webhook_payload"): + raise HTTPException(status_code=404, detail="Webhook channel not found") + payload = await request.json() + if not isinstance(payload, dict): + raise HTTPException(status_code=400, detail="Webhook payload must be a JSON object") + result = await adapter.handle_webhook_payload(payload) + status_code = 202 if result.get("pending") else 200 + return JSONResponse(result, status_code=status_code) +``` + +Also support `GET /api/channels/{channel_id}/webhook` only if needed later for platform challenge. Generic v1 can omit GET. + +- [ ] **Step 4: Include runtime controls and channel status in `/api/status`** + +Replace hard-coded: + +```python +"channels": [{"name": "web", "enabled": True}], +``` + +with: + +```python +runtime = getattr(request.app.state, "channel_runtime", None) +"channels": runtime.statuses() if runtime is not None else [], +"runtime_controls": { + "self_restart": _self_restart_enabled(), +}, +``` + +- [ ] **Step 5: Make `run_gateway()` a compatibility wrapper** + +In `gateway/main.py`, keep public signature if practical, but route through `ChannelRuntime` for lifecycle. Existing tests that inject `channels` can build a runtime with those adapters registered or can be rewritten to target `ChannelRuntime` directly. + +- [ ] **Step 6: Verify backend route tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_gateway_channels.py tests/unit/test_channel_runtime.py -q +``` + +Expected: pass without real network or real LLM calls. + +## Task 9: Self-Restart API + +**Files:** +- Modify: `app-instance/backend/beaver/interfaces/web/app.py` +- Modify: `app-instance/create-instance.sh` +- Modify: `app-instance/backend/tests/unit/test_channel_runtime.py` + +- [ ] **Step 1: Add restart status and disabled behavior tests** + +Add: + +```python +def test_self_restart_env_defaults_enabled(monkeypatch) -> None: + from beaver.interfaces.web.app import _self_restart_enabled + + monkeypatch.delenv("BEAVER_ENABLE_SELF_RESTART", raising=False) + + assert _self_restart_enabled() is True + + +def test_self_restart_env_can_disable(monkeypatch) -> None: + from beaver.interfaces.web.app import _self_restart_enabled + + monkeypatch.setenv("BEAVER_ENABLE_SELF_RESTART", "0") + + assert _self_restart_enabled() is False +``` + +- [ ] **Step 2: Implement restart helpers** + +In `web/app.py`: + +```python +def _self_restart_enabled() -> bool: + return os.getenv("BEAVER_ENABLE_SELF_RESTART", "1").strip() not in {"0", "false", "False"} + + +def _schedule_self_restart(delay_seconds: float = 0.75) -> None: + import os as _os + import threading + import time + + def _exit_later() -> None: + time.sleep(delay_seconds) + _os._exit(0) + + threading.Thread(target=_exit_later, daemon=True).start() +``` + +Add route: + +```python +@app.post("/api/runtime/restart") +async def restart_runtime() -> JSONResponse: + if not _self_restart_enabled(): + raise HTTPException(status_code=403, detail="Self restart is disabled") + _schedule_self_restart() + return JSONResponse({"ok": True, "restarting": True}, status_code=202) +``` + +- [ ] **Step 3: Ensure created containers keep default restart enabled** + +In `app-instance/create-instance.sh`, add to `RUN_ARGS`: + +```bash +-e "BEAVER_ENABLE_SELF_RESTART=1" +``` + +- [ ] **Step 4: Verify restart helper tests** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py::test_self_restart_env_defaults_enabled tests/unit/test_channel_runtime.py::test_self_restart_env_can_disable -q +``` + +Expected: pass. + +## Task 10: Nginx Timeout for Channel Webhooks + +**Files:** +- Modify: `app-instance/nginx.conf` + +- [ ] **Step 1: Add channel webhook location before generic `/api/`** + +In `app-instance/nginx.conf`, before `location /api/`, add: + +```nginx + location /api/channels/ { + proxy_pass http://127.0.0.1:18080; + proxy_read_timeout 1800; + proxy_send_timeout 1800; + } +``` + +- [ ] **Step 2: Check nginx config syntax if nginx is available** + +Run: + +```bash +nginx -t -c /home/ivan/xuan/beaver_project/app-instance/nginx.conf +``` + +Expected: syntax ok. If nginx is not installed locally, record that this check was skipped and rely on container verification later. + +## Task 11: Frontend API Types and Helpers + +**Files:** +- Modify: `app-instance/frontend/types/index.ts` +- Modify: `app-instance/frontend/lib/api.ts` + +- [ ] **Step 1: Update types** + +Replace `ChannelStatus` with: + +```ts +export interface ChannelStatus { + channel_id: string; + name?: string; + kind: string; + mode: string; + display_name: string; + enabled: boolean; + state: 'configured' | 'disabled' | 'starting' | 'running' | 'degraded' | 'error' | 'stopped'; + account_id: string; + last_error?: string | null; + last_event_at?: string | null; + started_at?: string | null; + capabilities: string[]; + webhook_url?: string | null; +} + +export interface ChannelEventRecord { + event_id: string; + channel_id: string; + kind: string; + session_id?: string | null; + message_id?: string | null; + run_id?: string | null; + status: string; + error?: string | null; + text_preview?: string | null; + text_length?: number; + created_at: string; + metadata?: Record; +} + +export interface RuntimeControls { + self_restart: boolean; +} +``` + +Add to `SystemStatus`: + +```ts +runtime_controls?: RuntimeControls; +``` + +- [ ] **Step 2: Add API helpers** + +In `app-instance/frontend/lib/api.ts`: + +```ts +export async function listChannelEvents(channelId: string, limit: number = 100): Promise { + return fetchJSON(`/api/channels/${encodeURIComponent(channelId)}/events?limit=${limit}`); +} + +export async function restartRuntime(): Promise<{ ok: boolean; restarting: boolean }> { + return fetchJSON('/api/runtime/restart', { + method: 'POST', + timeoutMs: 5000, + }); +} +``` + +Import `ChannelEventRecord`. + +- [ ] **Step 3: Typecheck frontend** + +Run: + +```bash +cd app-instance/frontend +npm run typecheck +``` + +Expected: pass. If this repo does not define `typecheck`, run `npx tsc --noEmit`. + +## Task 12: Status Page Channels and Restart UI + +**Files:** +- Modify: `app-instance/frontend/app/(app)/status/page.tsx` + +- [ ] **Step 1: Add UI state** + +Add imports: + +```ts +import { listChannelEvents, restartRuntime } from '@/lib/api'; +import type { ChannelEventRecord, ChannelStatus, ProviderStatus, SystemStatus } from '@/types'; +``` + +Add state: + +```ts +const [selectedChannel, setSelectedChannel] = useState(null); +const [channelEvents, setChannelEvents] = useState([]); +const [loadingChannelEvents, setLoadingChannelEvents] = useState(false); +const [restartOpen, setRestartOpen] = useState(false); +const [restarting, setRestarting] = useState(false); +const [restartError, setRestartError] = useState(null); +``` + +- [ ] **Step 2: Add handlers** + +```ts +const openChannelDetails = async (channel: ChannelStatus) => { + setSelectedChannel(channel); + setChannelEvents([]); + setLoadingChannelEvents(true); + try { + setChannelEvents(await listChannelEvents(channel.channel_id, 20)); + } catch { + setChannelEvents([]); + } finally { + setLoadingChannelEvents(false); + } +}; + +const handleRestart = async () => { + setRestarting(true); + setRestartError(null); + try { + await restartRuntime(); + setRestartOpen(false); + window.setTimeout(() => { + void loadStatus(); + }, 5000); + } catch (err: any) { + setRestartError(err.message || pickAppText(locale, '重启失败', 'Restart failed')); + } finally { + setRestarting(false); + } +}; +``` + +- [ ] **Step 3: Add restart button to Instance runtime card** + +Next to Runtime Logs: + +```tsx +{status.runtime_controls?.self_restart !== false ? ( + +) : null} +``` + +- [ ] **Step 4: Replace Channels card content** + +Replace the current chip grid with compact rows: + +```tsx +
+ {status.channels.length === 0 ? ( +

+ {pickAppText(locale, '尚未配置通道', 'No channels configured')} +

+ ) : ( + status.channels.map((ch) => ( + + )) + )} +
+``` + +- [ ] **Step 5: Add channel details dialog** + +Add a dialog after the provider dialog: + +```tsx + !open && setSelectedChannel(null)}> + + + {selectedChannel?.display_name || selectedChannel?.channel_id} + + {selectedChannel ? `${selectedChannel.kind}/${selectedChannel.mode} · ${selectedChannel.channel_id}` : ''} + + + {selectedChannel ? ( +
+
+ + + + +
+
+

{pickAppText(locale, '最近事件', 'Recent events')}

+ {loadingChannelEvents ? ( + + ) : ( +
+ {channelEvents.map((event) => ( +
+
+ {event.kind} + {event.created_at} +
+
+ {event.status}{event.error ? ` · ${event.error}` : ''} +
+
+ ))} + {channelEvents.length === 0 ? ( +

+ {pickAppText(locale, '暂无事件', 'No events yet')} +

+ ) : null} +
+ )} +
+
+ ) : null} +
+
+``` + +- [ ] **Step 6: Add restart confirmation dialog** + +```tsx + + + + {pickAppText(locale, '重启实例?', 'Restart instance?')} + + {pickAppText( + locale, + '应用会短暂不可用,正在运行的对话和通道请求可能会中断。', + 'The app will be unavailable briefly. Running chats and channel requests may be interrupted.' + )} + + + {restartError ?

{restartError}

: null} + + + + +
+
+``` + +- [ ] **Step 7: Verify frontend** + +Run: + +```bash +cd app-instance/frontend +npm test -- --run +npx tsc --noEmit +``` + +Expected: pass. + +## Task 13: End-to-End Backend Verification + +**Files:** +- Modify: backend tests as needed. + +- [ ] **Step 1: Run backend unit tests for channel area** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_channel_runtime.py tests/unit/test_gateway_channels.py tests/unit/test_config_loader.py tests/unit/test_imports.py -q +``` + +Expected: pass. + +- [ ] **Step 2: Run broader backend tests impacted by AgentService/session context** + +Run: + +```bash +cd app-instance/backend +uv run pytest tests/unit/test_agent_loop.py tests/unit/test_initial_skill_tool_hints.py tests/unit/test_marketplace_and_mcp.py -q +``` + +Expected: pass. + +- [ ] **Step 3: Manually verify generic webhook in local app** + +Start backend/frontend using the existing app-instance workflow. With config containing: + +```json +{ + "channels": { + "webhook-dev": { + "enabled": true, + "kind": "webhook", + "mode": "webhook", + "accountId": "local", + "displayName": "Webhook Dev", + "config": { + "responseTimeoutSeconds": 1800 + }, + "secrets": {} + } + } +} +``` + +Send: + +```bash +curl -sS -X POST http://127.0.0.1:18080/api/channels/webhook-dev/webhook \ + -H 'Content-Type: application/json' \ + -d '{"peer_id":"demo-user","thread_id":"main","message_id":"msg-001","text":"hello","peer_type":"dm"}' +``` + +Expected shape: + +```json +{ + "ok": true, + "duplicate": false, + "pending": false, + "session_id": "webhook-dev:local:demo-user:main", + "reply": "..." +} +``` + +Repeat the same curl. Expected `duplicate: true` and cached/pending response, with no second agent execution. + +- [ ] **Step 4: Manually verify status UI** + +Open `/status` and confirm: + +- Channel row shows `webhook-dev`, `webhook/webhook`, account `local`, state `running`. +- Details dialog shows webhook URL and recent events. +- Restart button opens confirmation dialog. +- Confirming restart calls `/api/runtime/restart` and the app comes back after container restart. + +## Acceptance Criteria + +- All channel inbound messages, including generic sync webhook, use the strict bus-first path: + +```text +adapter -> ChannelRuntime.accept_inbound() -> MessageBus.inbound -> bridge -> AgentService.handle_inbound_message() -> MessageBus.outbound -> ChannelManager.dispatch_outbound() -> adapter.send() +``` + +- No adapter directly calls `AgentService`. +- No adapter directly publishes to `MessageBus`. +- New session ids always use `::[:]`. +- Missing `ChannelIdentity`, missing `account_id`, or missing `peer_id` rejects inbound and records `inbound_rejected`. +- Runtime overwrites inconsistent inbound `session_id` and records `session_id_normalized`. +- Generic webhook accepts only fixed-schema text payloads. +- Generic webhook default response timeout is 1800 seconds. +- Generic webhook timeout returns HTTP 202 and does not cancel the backend run. +- Late generic webhook outbound is recorded as `outbound_unclaimed`. +- Dedupe cache returns pending/done/error for repeated `message_id` without rerunning the agent. +- Event log is persisted under workspace and does not store raw payloads or full conversation text. +- `/api/status` returns runtime channel status and runtime controls. +- `/status` displays compact channel status, details events on demand, and has a confirmed restart button. +- `POST /api/runtime/restart` is enabled by default and can be disabled with `BEAVER_ENABLE_SELF_RESTART=0`. +- AuthZ, real platform adapters, cron proactive delivery, streaming/progress, attachments, hot reload, and config UI are not implemented in v1. + +## V2 Backlog + +- Telegram adapter. +- Slack adapter. +- WhatsApp adapter. +- Platform signature validation and later AuthZ integration if needed. +- Channel config UI with secret masking. +- Hot reload or adapter restart API. +- Cron proactive channel delivery. +- Attachment handling. +- Streaming, typing, and progress messages where platform supports them. +- Retry/replay UI for failed outbound delivery. +- Rich event filtering/search. + +## Execution Options + +Plan complete and saved to `docs/superpowers/plans/2026-06-01-channel-runtime-v1.md`. + +Two execution options: + +1. **Subagent-Driven (recommended)** - Dispatch a fresh subagent per task, review between tasks, fast iteration. +2. **Inline Execution** - Execute tasks in this session using executing-plans, batch execution with checkpoints. + +Which approach?