Files
beaver_project/app-instance/backend/beaver/interfaces/channels/runtime.py
steven_li 4b0bf65ace ```
feat(engine): 优化智能体循环中的助手消息处理逻辑

- 在没有工具调用时才添加助手消息到上下文
- 确保工具调用响应正确添加到消息上下文中
- 修复了消息构建的条件逻辑

fix(cron): 改进定时任务调度的时间解析功能

- 添加正则表达式导入用于时间显示解析
- 实现从显示文本中提取毫秒间隔的功能
- 增强整数转换的安全性,避免类型错误
- 优化定时任务配置的解析逻辑

feat(outlook): 增强Outlook集成的功能和稳定性

- 将默认超时时间从10秒增加到180秒
- 为状态检查函数添加可选的验证参数
- 串行执行邮件概览获取操作而非并行
- 改进连接状态验证逻辑

feat(channel): 添加设备名称作为会话标识的选项

- 为终端WebSocket适配器添加新的配置选项
- 实现基于设备名称生成会话对等ID的功能
- 记录原始对等ID和设备名称的元数据
- 支持从设备名称创建会话对等ID

feat(skills): 完善技能学习评估系统和进度跟踪

- 在应用启动时自动调度待评估的技能草稿
- 为技能评估工作创建独立的循环工厂
- 实现异步技能评估任务的取消和清理机制
- 添加技能评估进度报告和状态跟踪功能
- 扩展会话列表API以包含更多详细信息
- 防止对不存在的会话进行操作
- 优化技能草稿提交和评估的业务逻辑

perf(skills): 提升技能评估的并发性能

- 实现并行技能案例评估以提高效率
- 添加最大并行案例数的环境变量控制
- 实现实时评估进度更新和回调机制
- 优化评估过程中的资源管理和同步

refactor(services): 创建隔离的智能体循环实例

- 添加创建独立智能体循环的工厂方法
- 确保新循环继承运行时服务配置
- 支持技能评估等需要隔离环境的场景
```
2026-06-15 14:48:16 +08:00

531 lines
22 KiB
Python

"""Channel runtime host for adapter lifecycle and bus-first routing."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from beaver.foundation.config.schema import ChannelConfig
from beaver.foundation.events import InboundMessage, MessageBus, OutboundMessage
from beaver.interfaces.channels.base import ChannelAdapter
from beaver.interfaces.channels.manager import ChannelManager
from beaver.interfaces.channels.state import ChannelDedupeStore, ChannelEventLog
from beaver.services.agent_service import AgentService
def _iso_now() -> str:
return datetime.now(timezone.utc).isoformat()
def _channel_capabilities(kind: str, mode: str) -> list[str]:
if kind == "webhook":
return ["receive_text", "send_text", "sync_webhook_response"]
if kind == "terminal" and mode == "websocket":
return ["receive_text", "send_text", "persistent_connection"]
if kind in {"feishu", "qqbot", "telegram"}:
return ["receive_text", "send_text", "receive_media", "groups"]
if kind == "weixin":
return ["receive_text", "send_text", "receive_media", "direct_messages"]
return []
@dataclass(slots=True)
class ChannelAcceptResult:
accepted: bool
duplicate: bool = False
pending: bool = False
rejected: bool = False
session_id: str | None = None
dedupe_key: str | None = None
record: dict[str, Any] | None = None
error: str | None = None
class ChannelRuntime:
"""Own channel adapters, state, and the inbound/outbound bus bridge."""
def __init__(
self,
*,
service: AgentService,
workspace: Path,
channels: dict[str, ChannelConfig],
bus: MessageBus | None = None,
) -> None:
self.service = service
self.workspace = Path(workspace)
self.bus = bus or MessageBus()
self.manager = ChannelManager(self.bus)
self.channel_configs = dict(channels)
self.adapters: dict[str, ChannelAdapter] = {}
self.states: dict[str, dict[str, Any]] = {}
state_dir = self.workspace / "state" / "channels"
retention = self._default_dedupe_retention_hours()
self.dedupe = ChannelDedupeStore(state_dir / "dedupe.json", retention_hours=retention)
self.events = ChannelEventLog(state_dir / "events.jsonl")
self._bridge_task: asyncio.Task[None] | None = None
self._dispatch_task: asyncio.Task[None] | None = None
self._stop_event = asyncio.Event()
self._dispatch_stop_event = asyncio.Event()
self._lifecycle_lock = asyncio.Lock()
async def start(self) -> None:
self._stop_event.clear()
self._dispatch_stop_event.clear()
for channel_id, cfg in self.channel_configs.items():
if not cfg.enabled:
self.states[channel_id] = {"state": "disabled", "last_error": None}
continue
try:
adapter = self._build_adapter(channel_id, cfg)
self.adapters[channel_id] = adapter
self.manager.register(adapter)
await adapter.start()
self.states[channel_id] = {
"state": "running",
"last_error": None,
"started_at": _iso_now(),
}
self.events.record(channel_id=channel_id, kind="adapter_started")
except Exception as exc: # pragma: no cover - defensive startup isolation
self.states[channel_id] = {"state": "error", "last_error": str(exc)}
self.events.record(
channel_id=channel_id,
kind="adapter_error",
status="error",
error=str(exc),
)
self._bridge_task = asyncio.create_task(self._bridge_inbound_to_agent())
self._dispatch_task = asyncio.create_task(
self.manager.dispatch_outbound(
self._dispatch_stop_event,
on_delivered=self._record_outbound_delivered,
on_failed=self._record_outbound_failed,
)
)
async def stop(self) -> None:
self._stop_event.set()
if self._bridge_task is not None:
self._bridge_task.cancel()
try:
await self._bridge_task
except asyncio.CancelledError:
pass
self._dispatch_stop_event.set()
if self._dispatch_task is not None:
try:
await asyncio.wait_for(self._dispatch_task, timeout=1.0)
except asyncio.TimeoutError:
self._dispatch_task.cancel()
try:
await self._dispatch_task
except asyncio.CancelledError:
pass
await self.manager.stop()
for channel_id in self.adapters:
self.events.record(channel_id=channel_id, kind="adapter_stopped")
async def add_channel(self, channel_id: str, config: ChannelConfig) -> None:
async with self._lifecycle_lock:
current = self.channel_configs.get(channel_id)
if current == config and channel_id in self.adapters:
return
if not config.enabled:
await self._remove_channel_locked(channel_id)
self.channel_configs[channel_id] = config
self.states[channel_id] = {"state": "disabled", "last_error": None}
return
adapter = self._build_adapter(channel_id, config)
await adapter.start()
old_adapter = self.adapters.get(channel_id)
self.manager.replace_registered(adapter)
self.adapters[channel_id] = adapter
self.channel_configs[channel_id] = config
self.states[channel_id] = {"state": "running", "last_error": None, "started_at": _iso_now()}
self.events.record(channel_id=channel_id, kind="adapter_started")
if old_adapter is not None and old_adapter is not adapter:
await old_adapter.stop()
async def remove_channel(self, channel_id: str) -> None:
async with self._lifecycle_lock:
await self._remove_channel_locked(channel_id)
async def _remove_channel_locked(self, channel_id: str) -> None:
adapter = self.adapters.pop(channel_id, None)
self.manager.unregister(channel_id)
self.channel_configs.pop(channel_id, None)
if adapter is not None:
await adapter.stop()
self.events.record(channel_id=channel_id, kind="adapter_stopped")
self.states[channel_id] = {"state": "removed", "last_error": None}
async def accept_inbound(self, message: InboundMessage) -> ChannelAcceptResult:
identity = message.channel_identity
if identity is None:
self.events.record(
channel_id=message.channel,
kind="inbound_rejected",
status="error",
error="channel_identity is required",
)
return ChannelAcceptResult(
accepted=False,
rejected=True,
error="channel_identity is required",
)
validation_error = identity.validation_error()
if validation_error:
self.events.record(
channel_id=identity.channel_id,
kind="inbound_rejected",
status="error",
error=validation_error,
)
return ChannelAcceptResult(accepted=False, rejected=True, error=validation_error)
expected_session_id = identity.session_id()
if message.session_id != expected_session_id:
self.events.record(
channel_id=identity.channel_id,
kind="session_id_normalized",
session_id=expected_session_id,
message_id=identity.message_id,
)
message.session_id = expected_session_id
message.channel = identity.channel_id
dedupe_key = identity.dedupe_key()
if dedupe_key:
write = self.dedupe.mark_processing(
dedupe_key=dedupe_key,
session_id=expected_session_id,
message_id=identity.message_id or "",
)
if not write.created:
record = write.record or {}
self.events.record(
channel_id=identity.channel_id,
kind="inbound_duplicate",
session_id=expected_session_id,
message_id=identity.message_id,
status=str(record.get("status") or "processing"),
)
return ChannelAcceptResult(
accepted=False,
duplicate=True,
pending=record.get("status") == "processing",
session_id=expected_session_id,
dedupe_key=dedupe_key,
record=record,
)
self.events.record(
channel_id=identity.channel_id,
kind="inbound_accepted",
session_id=expected_session_id,
message_id=identity.message_id,
text=message.content,
)
await self.bus.publish_inbound(message)
return ChannelAcceptResult(
accepted=True,
session_id=expected_session_id,
dedupe_key=dedupe_key,
)
def statuses(self) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
recent = self.events.recent(limit=500)
last_by_channel = {event["channel_id"]: event for event in recent if event.get("channel_id")}
for channel_id, cfg in self.channel_configs.items():
state = self.states.get(channel_id, {"state": "configured", "last_error": None})
capabilities = _channel_capabilities(cfg.kind, cfg.mode)
webhook_url = None
websocket_url = None
connected_peers = 0
if cfg.kind == "webhook":
webhook_url = f"/api/channels/{channel_id}/webhook"
elif cfg.kind == "terminal" and cfg.mode == "websocket":
websocket_url = f"/api/channels/{channel_id}/ws"
adapter = self.adapters.get(channel_id)
if adapter is not None and hasattr(adapter, "status_extra"):
extra = adapter.status_extra() # type: ignore[attr-defined]
connected_peers = int(extra.get("connected_peers") or 0)
items.append(
{
"channel_id": channel_id,
"name": channel_id,
"kind": cfg.kind,
"mode": cfg.mode,
"display_name": cfg.display_name or channel_id,
"enabled": cfg.enabled,
"state": state.get("state", "configured"),
"account_id": cfg.account_id,
"last_error": state.get("last_error"),
"started_at": state.get("started_at"),
"last_event_at": last_by_channel.get(channel_id, {}).get("created_at"),
"capabilities": capabilities,
"webhook_url": webhook_url,
"websocket_url": websocket_url,
"connected_peers": connected_peers,
}
)
return items
def recent_events(self, channel_id: str, *, limit: int = 100) -> list[dict[str, Any]]:
return self.events.recent(channel_id=channel_id, limit=limit)
def record_event(
self,
*,
channel_id: str,
kind: str,
session_id: str | None = None,
message_id: str | None = None,
run_id: str | None = None,
status: str = "ok",
error: str | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
self.events.record(
channel_id=channel_id,
kind=kind,
session_id=session_id,
message_id=message_id,
run_id=run_id,
status=status,
error=error,
metadata=metadata,
)
def _build_adapter(self, channel_id: str, cfg: ChannelConfig) -> ChannelAdapter:
if cfg.kind == "webhook" and cfg.mode == "webhook":
from beaver.interfaces.channels.generic_webhook import GenericWebhookAdapter
return GenericWebhookAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
response_timeout_seconds=float(cfg.config.get("response_timeout_seconds") or 1800),
)
if cfg.kind == "terminal" and cfg.mode == "websocket":
from beaver.interfaces.channels.terminal_websocket import TerminalWebSocketAdapter
return TerminalWebSocketAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
event_recorder=self.record_event,
heartbeat_seconds=float(cfg.config.get("heartbeat_seconds") or 30),
max_message_chars=int(cfg.config.get("max_message_chars") or 20000),
session_peer_from_device_name=bool(
cfg.config.get("session_peer_from_device_name")
or cfg.config.get("sessionPeerFromDeviceName")
),
)
if cfg.kind == "telegram" and cfg.mode in {"polling", "webhook"}:
from beaver.interfaces.channels.platforms.telegram import TelegramAdapter
return TelegramAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "feishu" and cfg.mode in {"websocket", "webhook"}:
from beaver.interfaces.channels.platforms.feishu import FeishuAdapter
return FeishuAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "qqbot" and cfg.mode == "websocket":
from beaver.interfaces.channels.platforms.qqbot import QQBotAdapter
return QQBotAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "weixin" and cfg.mode == "polling":
from beaver.interfaces.channels.platforms.weixin import WeixinAdapter
return WeixinAdapter(
channel_id=channel_id,
kind=cfg.kind,
mode=cfg.mode,
account_id=cfg.account_id,
display_name=cfg.display_name,
inbound_sink=self,
secrets=cfg.secrets,
config=cfg.config,
event_recorder=self.record_event,
)
if cfg.kind == "external_connector" and cfg.mode == "http":
import os
from beaver.interfaces.channels.connections.sidecar_client import ConnectorSidecarClient
from beaver.interfaces.channels.external_connector import ExternalConnectorChannel
base_url = str(cfg.config.get("sidecarBaseUrl") or os.getenv("EXTERNAL_CONNECTOR_BASE_URL") or "").strip()
token = os.getenv("EXTERNAL_CONNECTOR_TOKEN", "")
platform_kind = str(cfg.config.get("platformKind") or "").strip()
connection_id = str(cfg.config.get("connectionId") or "").strip()
if not base_url:
raise ValueError("external connector sidecarBaseUrl is required")
if not platform_kind:
raise ValueError("external connector platformKind is required")
if not connection_id:
raise ValueError("external connector connectionId is required")
return ExternalConnectorChannel(
channel_id=channel_id,
platform_kind=platform_kind,
connection_id=connection_id,
account_id=cfg.account_id,
display_name=cfg.display_name,
sidecar_client=ConnectorSidecarClient(base_url=base_url, token=token),
)
raise ValueError(f"Unsupported channel kind/mode: {cfg.kind}/{cfg.mode}")
async def _bridge_inbound_to_agent(self) -> None:
current_inbound: InboundMessage | None = None
while not self._stop_event.is_set():
try:
current_inbound = await asyncio.wait_for(self.bus.consume_inbound(), timeout=0.25)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
raise
inbound = current_inbound
identity = inbound.channel_identity
try:
self.events.record(
channel_id=inbound.channel,
kind="direct_run_started",
session_id=inbound.session_id,
message_id=identity.message_id if identity else inbound.message_id,
)
outbound = await self.service.handle_inbound_message(inbound)
except asyncio.CancelledError:
outbound = AgentService.build_outbound_error(
inbound,
detail="Channel runtime stopped before completing the inbound message",
finish_reason="cancelled",
)
self._mark_dedupe_result(inbound, outbound)
await self.bus.publish_outbound(outbound)
current_inbound = None
raise
except Exception as exc:
self.events.record(
channel_id=inbound.channel,
kind="direct_run_failed",
session_id=inbound.session_id,
message_id=identity.message_id if identity else inbound.message_id,
status="error",
error=str(exc),
)
outbound = AgentService.build_outbound_error(
inbound,
detail=str(exc),
finish_reason="error",
)
else:
self.events.record(
channel_id=outbound.channel,
kind="direct_run_finished",
session_id=outbound.session_id,
message_id=identity.message_id if identity else inbound.message_id,
run_id=outbound.run_id,
)
self._mark_dedupe_result(inbound, outbound)
await self.bus.publish_outbound(outbound)
current_inbound = None
def _mark_dedupe_result(self, inbound: InboundMessage, outbound: OutboundMessage) -> None:
identity = inbound.channel_identity
dedupe_key = identity.dedupe_key() if identity else None
if not dedupe_key:
return
cfg = self.channel_configs.get(identity.channel_id)
max_reply_chars = int((cfg.config if cfg else {}).get("max_cached_reply_chars") or 20000)
max_error_chars = int((cfg.config if cfg else {}).get("max_cached_error_chars") or 4000)
if outbound.finish_reason == "error":
self.dedupe.mark_error(
dedupe_key=dedupe_key,
error=outbound.content,
max_error_chars=max_error_chars,
)
else:
self.dedupe.mark_done(
dedupe_key=dedupe_key,
run_id=outbound.run_id,
reply=outbound.content,
max_reply_chars=max_reply_chars,
)
async def _record_outbound_delivered(self, message: OutboundMessage) -> None:
kind = "outbound_unclaimed" if message.metadata.get("delivery_status") == "unclaimed" else "outbound_delivered"
self.events.record(
channel_id=message.channel,
kind=kind,
session_id=message.session_id,
message_id=message.channel_identity.message_id if message.channel_identity else message.message_id,
run_id=message.run_id,
)
async def _record_outbound_failed(self, message: OutboundMessage, exc: Exception | None) -> None:
self.events.record(
channel_id=message.channel,
kind="outbound_delivery_failed",
session_id=message.session_id,
message_id=message.channel_identity.message_id if message.channel_identity else message.message_id,
run_id=message.run_id,
status="error",
error=str(exc) if exc else "channel not registered",
)
def _default_dedupe_retention_hours(self) -> int:
for cfg in self.channel_configs.values():
value = cfg.config.get("dedupe_retention_hours")
if value is not None:
return int(value)
return 48