"""Beaver 运行时上下文装配器。 这个模块是 `session` 和 `provider` 之间的中间层,职责非常明确: 1. 把运行前已经准备好的静态/半静态上下文拼成一份稳定的 system prompt 2. 把从 session 事件流里裁剪出的“可见历史”和当前用户输入整理成 provider 可直接消费的 messages 3. 在 tool loop 中,持续把 assistant/tool 消息按统一格式追加回消息数组 为什么这层必须单独存在: 1. `AgentLoop` 不应该自己拼 prompt,否则很快又会长成一个大文件 2. `memory`、`skills`、`session` 的注入顺序需要固定,否则模型行为会漂移 3. tool loop 前后追加消息的格式必须统一,否则不同 provider 很容易出兼容问题 这一版 builder 的设计目标是“最小但稳定”: 1. 先服务单 agent 主链 2. 先支持 frozen curated memory,而不是 live memory 3. skills 通过显式激活消息注入,不在这里做磁盘扫描 4. 为后续 channel / gateway / team metadata 预留注入位,但不提前做复杂逻辑 """ from __future__ import annotations import json from dataclasses import dataclass, field from typing import Any from beaver.memory.curated.snapshot import MemorySnapshot BEAVER_USER_ASSISTANT_IDENTITY_PROMPT = ( "You are 海狸 (Beaver), an AI assistant developed by 博维资讯系统有限公司. " "When communicating with users, keep this identity consistent. " "If users ask who you are, say that you are 海狸 (Beaver), 博维资讯系统有限公司研发的 AI 助手." ) @dataclass(slots=True) class SkillContext: """单个已激活 skill 的最小表示。 这里故意不把 skill 设计成复杂对象,只保留 builder 真正关心的两部分: - `name`:用于生成激活提示 - `content`:skill 的完整正文 注意:skill 正文不再塞进 system prompt,而是转成显式消息注入。 """ name: str content: str version: str = "legacy" content_hash: str = "" activation_reason: str = "selected" tool_hints: list[str] = field(default_factory=list) @dataclass(slots=True) class SessionContext: """当前运行轮次的会话元数据。 这不是 session store 里的完整 record,而是 prompt builder 关心的那一小部分: - 哪个 session - 来源是什么 - 当前使用什么 model - 是否有 channel/chat/user 这类运行路由信息 把它单独抽出来的原因是: 1. builder 不应该知道 SQLite row 长什么样 2. 不同入口(CLI/Web/Gateway)都可以把自己的 metadata 收敛成同一种结构 """ session_id: str | None = None source: str | None = None model: str | None = None user_id: str | None = None channel: str | None = None chat_id: str | None = None parent_session_id: str | None = None @dataclass(slots=True) class RuntimeContext: """Per-run runtime facts that should be visible to the model.""" utc_datetime: str local_datetime: str timezone: str | None = None utc_offset: str | None = None @dataclass(slots=True) class ContextBuildInput: """一次上下文构建所需的全部输入。 这个对象的作用不是“炫技式封装”,而是把主链里零散的数据显式收口。 这样一来,后面 `AgentLoop.process_direct()` 在组装参数时会更清晰,也更容易测试。 字段分组: - 身份/基础段:`base_system_prompt` - 会话可见历史:`history` - 当前输入:`current_user_input` - 冻结记忆:`memory_snapshot` - 技能:`activated_skills` - 运行元数据:`session_context` / `execution_context` - 额外扩展:`extra_sections` """ base_system_prompt: str = "" history: list[dict[str, Any]] = field(default_factory=list) current_user_input: str | list[dict[str, Any]] | None = None memory_snapshot: MemorySnapshot | None = None activated_skills: list[SkillContext] = field(default_factory=list) session_context: SessionContext | None = None runtime_context: RuntimeContext | None = None execution_context: str | None = None extra_sections: list[str] = field(default_factory=list) @dataclass(slots=True) class ContextBuildResult: """一次上下文构建后的结果。 保留 `system_prompt` 的原因: 1. `SessionManager.update_system_prompt()` 需要把最终注入的 prompt snapshot 落盘 2. 调试时经常需要区分“system prompt 长什么样”和“messages 长什么样” 3. 后面如果做 prompt audit / replay,也会直接复用这个结果 """ system_prompt: str messages: list[dict[str, Any]] class ContextBuilder: """负责把运行时输入装配成稳定上下文。 这一层故意保持“无 IO、无数据库、无网络”: - 不直接读 session store - 不直接读 memory store - 不直接扫描 skills 目录 这样 builder 的行为只由输入决定,便于单测,也便于后面并到真正的 AgentLoop 主链里。 """ def build_system_prompt( self, build_input: ContextBuildInput, ) -> str: """构建 system prompt。 顺序固定非常重要,当前约定是: 1. Beaver user-facing assistant identity 2. base system prompt 3. session metadata 4. runtime date/time 5. execution context 6. frozen memory snapshot 7. extra sections 这样设计的原因: - 身份与总规则要最靠前 - session/execution 是本轮运行语境,优先级高于长期记忆 - memory 必须是 frozen snapshot,避免中途写 memory 后 prompt 失真 - activated skill 正文放到显式消息里,避免 system prompt 持续膨胀 """ sections: list[str] = [BEAVER_USER_ASSISTANT_IDENTITY_PROMPT] base_system_prompt = (build_input.base_system_prompt or "").strip() if base_system_prompt: sections.append(base_system_prompt) session_section = self._render_session_section(build_input.session_context) if session_section: sections.append(session_section) runtime_section = self._render_runtime_section(build_input.runtime_context) if runtime_section: sections.append(runtime_section) execution_context = (build_input.execution_context or "").strip() if execution_context: sections.append(f"# Execution Context\n\n{execution_context}") if build_input.memory_snapshot is not None: # 这里明确只读 frozen snapshot,而不是去读 live memory store。 # 否则一旦当前会话中途写 memory,system prompt 语义就会和会话开头不一致。 snapshot_sections = build_input.memory_snapshot.as_prompt_sections() if snapshot_sections: sections.extend(snapshot_sections) for extra in build_input.extra_sections: cleaned = (extra or "").strip() if cleaned: sections.append(cleaned) return "\n\n---\n\n".join(sections) def build_messages( self, build_input: ContextBuildInput, ) -> ContextBuildResult: """构建一次模型调用的完整 messages。 这里做三件事: 1. 先生成最终 system prompt 2. 把已激活 skill 的完整正文作为显式消息注入 3. 把历史消息按原顺序接到后面 4. 如果存在当前用户输入,则把本轮输入追加为最后一条 user message 注意: - `history` 默认被视为“已经由 session/context 上游从完整事件流中裁剪好的可见结构” - builder 不负责裁剪历史窗口,这件事应由 session/loop 上层决定 - builder 只做最小格式统一 """ system_prompt = self.build_system_prompt(build_input) messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] messages.extend(self.build_skill_activation_messages(build_input.activated_skills)) for message in build_input.history: # 当前 builder 自己负责生成唯一的 system prompt。 # 如果上游 history 已经混入 system 消息,这里要主动跳过,避免双 system。 if message.get("role") == "system": continue messages.append(self._provider_history_message(message)) if build_input.current_user_input is not None: messages.append( { "role": "user", "content": build_input.current_user_input, } ) return ContextBuildResult( system_prompt=system_prompt, messages=messages, ) @staticmethod def _provider_history_message(message: dict[str, Any]) -> dict[str, Any]: """Keep persisted UI/audit fields out of provider message payloads.""" allowed = {"role", "content", "tool_calls", "tool_call_id", "name"} clean = {key: value for key, value in message.items() if key in allowed} if "name" not in clean and message.get("tool_name"): clean["name"] = message.get("tool_name") if isinstance(clean.get("tool_calls"), list): clean["tool_calls"] = ContextBuilder._provider_tool_calls(clean["tool_calls"]) return clean @staticmethod def _provider_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]]: """Normalize persisted tool calls to OpenAI-compatible provider payloads.""" normalized: list[dict[str, Any]] = [] for tool_call in tool_calls: if not isinstance(tool_call, dict): continue clean = dict(tool_call) function = clean.get("function") if isinstance(function, dict): clean_function = dict(function) arguments = clean_function.get("arguments") if not isinstance(arguments, str): clean_function["arguments"] = json.dumps(arguments or {}, ensure_ascii=False, default=str) clean["function"] = clean_function normalized.append(clean) return normalized def add_tool_result( self, messages: list[dict[str, Any]], *, tool_call_id: str, tool_name: str, result: str, ) -> list[dict[str, Any]]: """向消息数组追加一条 tool result。 为什么这个函数放在 builder,而不是塞回 `AgentLoop`: - tool message 的结构必须和 provider 兼容 - 统一在这里追加,可以避免不同执行路径拼出不同字段名 - 后面如果要兼容更多 provider 差异,也只改这一层 这里返回原 list 本身,保持旧项目的“可链式追加”习惯。 """ messages.append( { "role": "tool", "tool_call_id": tool_call_id, "name": tool_name, "content": result, } ) return messages def add_assistant_message( self, messages: list[dict[str, Any]], *, content: str | None, tool_calls: list[dict[str, Any]] | None = None, reasoning_content: str | None = None, ) -> list[dict[str, Any]]: """向消息数组追加 assistant 消息。 这里有两个实现细节非常重要: 1. 无论 `content` 是否为空,都显式写入 `content` 键 原因是部分 provider 在 assistant 带 `tool_calls` 时仍要求消息里存在 `content` 2. `reasoning_content` 只有在非空时才附带 因为这属于思考模型扩展字段,不应污染普通 provider 路径 """ message: dict[str, Any] = { "role": "assistant", "content": content, } if tool_calls: message["tool_calls"] = self._provider_tool_calls(tool_calls) if reasoning_content is not None: message["reasoning_content"] = reasoning_content messages.append(message) return messages def _render_session_section(self, session_context: SessionContext | None) -> str | None: """把运行时 session metadata 渲染成一个可读 section。 这一段的目标不是让模型“记住所有数据库字段”,而是给它足够的当前运行语境。 常见用途包括: - 知道当前来自 CLI 还是 Web/Gateway - 知道当前使用什么 model - 知道当前 channel/chat_id,便于后续多渠道行为约束 """ if session_context is None: return None rows: list[str] = [] if session_context.session_id: rows.append(f"Session ID: {session_context.session_id}") if session_context.source: rows.append(f"Source: {session_context.source}") if session_context.model: rows.append(f"Model: {session_context.model}") if session_context.user_id: rows.append(f"User ID: {session_context.user_id}") if session_context.channel: rows.append(f"Channel: {session_context.channel}") if session_context.chat_id: rows.append(f"Chat ID: {session_context.chat_id}") if session_context.parent_session_id: rows.append(f"Parent Session ID: {session_context.parent_session_id}") if not rows: return None return "# Current Session\n\n" + "\n".join(rows) def _render_runtime_section(self, runtime_context: RuntimeContext | None) -> str | None: """Render date/time facts captured for the current model run.""" if runtime_context is None: return None rows: list[str] = [] if runtime_context.utc_datetime: rows.append(f"Current UTC time: {runtime_context.utc_datetime}") if runtime_context.local_datetime: rows.append(f"Current local time: {runtime_context.local_datetime}") if runtime_context.timezone: rows.append(f"Local timezone: {runtime_context.timezone}") if runtime_context.utc_offset: rows.append(f"Local UTC offset: {runtime_context.utc_offset}") if not rows: return None return ( "# Current Date and Time\n\n" + "\n".join(rows) + "\n\nUse this section as authoritative for relative date/time references such as " '"today", "tomorrow", "now", "this week", and "next month".' ) def build_skill_activation_messages(self, activated_skills: list[SkillContext]) -> list[dict[str, str]]: """把已激活 skill 转成显式消息。 关键区别: - system prompt 只保留轻量 skills index - 真正生效的 skill 正文通过额外消息块显式加载 这样模型不需要“从摘要里猜怎么读到正文”,而是直接拿到完整指导内容。 """ messages: list[dict[str, str]] = [] for skill in activated_skills: content = (skill.content or "").strip() if not content: continue messages.append( { "role": "user", "content": ( f'[SYSTEM: The "{skill.name}" skill (version {skill.version}) is active for this run. ' "Follow its instructions as active guidance unless the user overrides them.]\n\n" f"{content}" ), } ) return messages