1071 lines
28 KiB
Markdown
1071 lines
28 KiB
Markdown
# nanobot Workflow
|
||
|
||
本文按当前仓库代码,整理 nanobot 的主要运行链路,重点说明:
|
||
|
||
1. 用户执行 `nanobot agent -m "你好"` 时,CLI 单轮模式到底走了什么路径。
|
||
2. `nanobot gateway` 常驻模式下,外部渠道、cron、heartbeat 如何进入同一套工作流。
|
||
3. Web 前端在 standalone 模式和 `create_app()` 预留的 gateway mode 下,分别如何判断并跳转不同链路。
|
||
4. 每个关键判断点的条件、分支结果和后续跳转。
|
||
|
||
|
||
## 0. 先纠正几个常见误解
|
||
|
||
在开始看流程前,先把几个和旧文档不一致的点说清楚:
|
||
|
||
1. `nanobot agent -m "你好"` 的默认 session 不是 `cli:default`,而是 `cli:direct`。
|
||
2. `agent -m` 单轮模式不会启动 `AgentLoop.run()` 主循环,也不会走 `bus.consume_inbound()` 常驻消费,而是直接调用 `process_direct()`。
|
||
3. `agent_loop.process_direct(message, session_id, ...)` 的第 2 个位置参数是 `session_key`,不是 `chat_id`。
|
||
4. 所以 CLI 单轮模式里:
|
||
- 会话 key 默认是 `cli:direct`
|
||
- `InboundMessage.channel` 默认是 `cli`
|
||
- `InboundMessage.chat_id` 默认是 `direct`
|
||
5. Agent 最大循环轮数不是固定写死 20,而是来自 `config.agents.defaults.max_tool_iterations`。
|
||
6. 工具数量也不是固定“9 个”:
|
||
- 固定基础工具会注册一批
|
||
- `cron_service` 存在时才注册 `cron`
|
||
- MCP 工具是运行时连接成功后动态追加
|
||
7. `nanobot agent` 不会自动执行 `_create_workspace_templates()`;模板补齐主要发生在 `onboard` 和 `web` 命令里。
|
||
8. `create_app()` 确实支持 “gateway mode + web_channel” 分支,但当前 CLI 里真正直接启动 Web 后端的是 `nanobot web`,它走的是 standalone 模式。
|
||
|
||
|
||
## 1. CLI 单轮模式:`nanobot agent -m "你好"`
|
||
|
||
这是当前最直接、最短的一条链路。
|
||
|
||
### 1.1 总览树
|
||
|
||
```text
|
||
用户执行: nanobot agent -m "你好"
|
||
│
|
||
├─ typer 解析命令行
|
||
│ ├─ 命中 @app.command() -> agent(...)
|
||
│ ├─ --message/-m 存在?
|
||
│ │ ├─ YES -> 单轮模式
|
||
│ │ └─ NO -> 进入交互模式(见第 2 章补充)
|
||
│
|
||
├─ load_config()
|
||
│ ├─ 默认读取 ~/.nanobot/config.json
|
||
│ ├─ 文件存在?
|
||
│ │ ├─ YES -> json.load()
|
||
│ │ ├─ 迁移旧字段 _migrate_config()
|
||
│ │ └─ Config.model_validate(data)
|
||
│ └─ NO / 读取失败 -> 返回默认 Config()
|
||
│
|
||
├─ _make_provider(config)
|
||
│ ├─ provider_name == openai_codex 或 model 以 openai-codex/ 开头?
|
||
│ │ ├─ YES -> OpenAICodexProvider
|
||
│ │ └─ NO
|
||
│ ├─ provider_name == custom?
|
||
│ │ ├─ YES -> CustomProvider
|
||
│ │ └─ NO
|
||
│ ├─ model 不是 bedrock/* 且 provider 没 API key 且 provider 也不是 OAuth?
|
||
│ │ ├─ YES -> console.print 错误并 typer.Exit(1)
|
||
│ │ └─ NO -> LiteLLMProvider
|
||
│
|
||
├─ MessageBus()
|
||
├─ CronService(jobs.json)
|
||
├─ AgentLoop(...)
|
||
│ ├─ PluginLoader
|
||
│ ├─ SkillsLoader
|
||
│ ├─ AgentRegistry
|
||
│ ├─ ContextBuilder
|
||
│ ├─ SessionManager
|
||
│ ├─ ToolRegistry
|
||
│ ├─ SubagentManager
|
||
│ ├─ DelegationManager
|
||
│ └─ _register_default_tools()
|
||
│
|
||
├─ asyncio.run(run_once())
|
||
│ └─ await agent_loop.process_direct("你好", session_key="cli:direct")
|
||
│
|
||
├─ process_direct(...)
|
||
│ ├─ await _connect_mcp()
|
||
│ ├─ InboundMessage(channel="cli", chat_id="direct", content="你好")
|
||
│ └─ await _process_message(msg, session_key="cli:direct")
|
||
│
|
||
├─ _process_message(...)
|
||
│ ├─ msg.channel == "system"?
|
||
│ │ ├─ YES -> 走 system 内部任务分支
|
||
│ │ └─ NO -> 走普通用户消息分支
|
||
│ ├─ sessions.get_or_create("cli:direct")
|
||
│ ├─ cmd == "/new"?
|
||
│ │ ├─ YES -> 强制归档 + clear + 返回 "New session started."
|
||
│ │ └─ NO
|
||
│ ├─ cmd == "/help"?
|
||
│ │ ├─ YES -> 直接返回帮助文本
|
||
│ │ └─ NO
|
||
│ ├─ 未归档消息 >= memory_window 且当前未在归档中?
|
||
│ │ ├─ YES -> create_task 后台归档
|
||
│ │ └─ NO
|
||
│ ├─ _set_tool_context()
|
||
│ ├─ build_messages()
|
||
│ ├─ _run_agent_loop()
|
||
│ ├─ _save_turn()
|
||
│ ├─ message 工具本轮已直接发送过消息?
|
||
│ │ ├─ YES -> return None
|
||
│ │ └─ NO -> return OutboundMessage(final_content)
|
||
│
|
||
├─ process_direct() 拿到 OutboundMessage.content
|
||
├─ console.print("🐈 ...")
|
||
└─ await agent_loop.close_mcp() -> 程序退出
|
||
```
|
||
|
||
### 1.2 关键步骤展开
|
||
|
||
#### Step 1: `typer` 进入 `agent(...)`
|
||
|
||
入口函数是 `nanobot/cli/commands.py` 里的 `agent()`。
|
||
|
||
关键判断:
|
||
|
||
1. `message` 参数是否存在
|
||
2. `logs` 是否开启
|
||
|
||
分支结果:
|
||
|
||
1. `message` 存在:
|
||
- 进入 `run_once()`
|
||
- 直接 `await agent_loop.process_direct(...)`
|
||
- 不启动 `bus` 常驻消费循环
|
||
2. `message` 不存在:
|
||
- 进入交互模式
|
||
- 单独启动 `agent_loop.run()` 和 CLI 的 inbound/outbound 桥接
|
||
|
||
#### Step 2: 配置加载 `load_config()`
|
||
|
||
入口在 `nanobot/config/loader.py`。
|
||
|
||
判断顺序:
|
||
|
||
1. 是否传入显式 `config_path`
|
||
- 没传则默认 `~/.nanobot/config.json`
|
||
2. 文件是否存在
|
||
- 不存在:直接返回默认 `Config()`
|
||
3. JSON 是否可解析
|
||
- 失败:打印 warning,回退默认 `Config()`
|
||
4. 旧字段是否需要迁移
|
||
- 例如把 `tools.exec.restrictToWorkspace` 搬到 `tools.restrictToWorkspace`
|
||
5. `Config.model_validate(data)` 是否通过
|
||
- 通过:得到结构化配置对象
|
||
- 不通过或出错:回退默认配置
|
||
|
||
#### Step 3: Provider 选择 `_make_provider(config)`
|
||
|
||
这里是第一处显式“多分支跳转”。
|
||
|
||
判断顺序如下:
|
||
|
||
1. `provider_name == "openai_codex"` 或 `model.startswith("openai-codex/")`
|
||
- 结果:创建 `OpenAICodexProvider`
|
||
- 跳转:后续统一交给 `AgentLoop`
|
||
|
||
2. `provider_name == "custom"`
|
||
- 结果:创建 `CustomProvider`
|
||
- 跳转:后续统一交给 `AgentLoop`
|
||
|
||
3. 其余 provider
|
||
- 先查 provider registry
|
||
- 判断是否需要 API key
|
||
|
||
4. API key 校验条件:
|
||
- `model` 不是 `bedrock/*`
|
||
- 并且 provider 配置里没有 `api_key`
|
||
- 并且 provider spec 也不是 OAuth provider
|
||
|
||
5. API key 校验结果:
|
||
- 条件成立:报错并 `typer.Exit(1)`
|
||
- 条件不成立:创建 `LiteLLMProvider`
|
||
|
||
#### Step 4: `AgentLoop(...)` 初始化
|
||
|
||
当前版本的 `AgentLoop` 初始化不再只是 `ContextBuilder + SessionManager + ToolRegistry + SubagentManager`,而是已经扩成多 agent 运行时。
|
||
|
||
初始化顺序大致如下:
|
||
|
||
1. 保存基础配置:
|
||
- `bus`
|
||
- `provider`
|
||
- `workspace`
|
||
- `model`
|
||
- `max_iterations`
|
||
- `temperature`
|
||
- `max_tokens`
|
||
- `memory_window`
|
||
- `exec_config`
|
||
- `a2a_config`
|
||
|
||
2. 创建运行时依赖:
|
||
- `PluginLoader`
|
||
- `SkillsLoader`
|
||
- `AgentRegistry`
|
||
- `ContextBuilder`
|
||
- `SessionManager`
|
||
- `ToolRegistry`
|
||
- `SubagentManager`
|
||
- `DelegationManager`
|
||
|
||
3. 注册默认工具 `_register_default_tools()`
|
||
|
||
当前注册逻辑是“条件式”的:
|
||
|
||
1. 一定注册:
|
||
- `read_file`
|
||
- `write_file`
|
||
- `edit_file`
|
||
- `list_dir`
|
||
- `exec`
|
||
- `web_search`
|
||
- `web_fetch`
|
||
- `message`
|
||
- `spawn`
|
||
|
||
2. 条件注册:
|
||
- `cron_service` 存在时,注册 `cron`
|
||
|
||
3. 运行时动态注册:
|
||
- MCP server 连接成功后,额外注册 `mcp_<server>_<tool>` 包装工具
|
||
|
||
#### Step 5: `process_direct(...)`
|
||
|
||
CLI 单轮模式走的是这条直连链路。
|
||
|
||
执行顺序:
|
||
|
||
1. `_connect_mcp()`
|
||
- 如果 `_mcp_connected=True`:直接返回
|
||
- 如果 `_mcp_connecting=True`:直接返回
|
||
- 如果没有 MCP 配置:直接返回
|
||
- 否则尝试连接 MCP server,并把远端工具注册进当前 `ToolRegistry`
|
||
|
||
2. 构造 `InboundMessage`
|
||
- `channel="cli"`
|
||
- `sender_id="user"`
|
||
- `chat_id="direct"`
|
||
- `content="你好"`
|
||
|
||
3. 进入 `_process_message(msg, session_key="cli:direct")`
|
||
|
||
注意:
|
||
|
||
1. 这里 `session_key` 是 `cli:direct`
|
||
2. 但消息对象本身的 `chat_id` 仍然是默认 `"direct"`
|
||
3. 所以单轮 CLI 的会话持久化 key 和当前消息路由上下文,是同时存在的两个概念
|
||
|
||
#### Step 6: `_process_message(...)`
|
||
|
||
这是整个运行时的主入口。
|
||
|
||
判断顺序如下:
|
||
|
||
1. `msg.channel == "system"`?
|
||
- YES:
|
||
- 把 `msg.chat_id` 解释成 `"{channel}:{chat_id}"`
|
||
- 走内部任务分支
|
||
- 常见来源:委派结果回流、后台公告等
|
||
- NO:
|
||
- 继续普通消息分支
|
||
|
||
2. 会话 key 选择:
|
||
- 如果显式传了 `session_key`,优先用它
|
||
- 否则用 `msg.session_key`
|
||
- `msg.session_key` 的规则是:
|
||
- 若有 `session_key_override`,用 override
|
||
- 否则用 `f"{channel}:{chat_id}"`
|
||
|
||
3. 内建命令拦截:
|
||
- `cmd == "/new"`:
|
||
- 先强制做记忆归档
|
||
- 成功后清空会话
|
||
- 直接返回 `"New session started."`
|
||
- `cmd == "/help"`:
|
||
- 直接返回帮助文本
|
||
- 其他内容:
|
||
- 继续进入模型链路
|
||
|
||
4. 归档触发判断:
|
||
- `unconsolidated >= memory_window`
|
||
- 并且当前会话不在 `_consolidating`
|
||
- 成立则异步 `create_task` 做后台归档
|
||
|
||
5. 工具上下文注入 `_set_tool_context(...)`
|
||
- `message` 工具拿到 `channel/chat_id/message_id`
|
||
- `spawn` 工具拿到 `channel/chat_id/announce_via_bus`
|
||
- `cron` 工具拿到 `channel/chat_id/session_key`
|
||
|
||
6. 附加工具判断:
|
||
- `extra_tools` 是否存在
|
||
- 存在:`self.tools.clone()` 后再注册临时工具
|
||
- 不存在:直接用 `self.tools`
|
||
|
||
7. 构造 prompt `context.build_messages(...)`
|
||
- `system prompt`
|
||
- `history`
|
||
- `current user message`
|
||
- `media`
|
||
|
||
#### Step 7: `ContextBuilder.build_messages(...)`
|
||
|
||
这里的判断主要发生在 `build_system_prompt(...)` 内。
|
||
|
||
拼装顺序:
|
||
|
||
1. `_get_identity()`
|
||
- 当前时间
|
||
- 时区
|
||
- 运行平台
|
||
- workspace 路径
|
||
|
||
2. `_load_bootstrap_files()`
|
||
- 按顺序读取:
|
||
- `AGENTS.md`
|
||
- `SOUL.md`
|
||
- `USER.md`
|
||
- `TOOLS.md`
|
||
- `IDENTITY.md`
|
||
- 文件存在才追加
|
||
|
||
3. `memory.get_memory_context()`
|
||
- 有内容才追加 `# Memory`
|
||
|
||
4. `skills.get_always_skills()`
|
||
- 有 always skills 才把全文注入
|
||
|
||
5. `skills.build_skills_summary()`
|
||
- 有技能摘要才注入 `# Skills`
|
||
|
||
6. `agent_registry.build_agents_summary()`
|
||
- 仅当 `ContextBuilder` 持有 `agent_registry`
|
||
- 且当前有可用 agent
|
||
- 才注入 `# Available Agents`
|
||
|
||
7. `execution_context`
|
||
- 只在 cron/system task 等场景显式传入
|
||
- 普通 CLI 对话通常为空
|
||
|
||
8. 最终 message 拼装:
|
||
- 第 1 条固定 `system`
|
||
- 后面追加历史消息
|
||
- 最后一条追加当前 `user`
|
||
|
||
#### Step 8: Agent 循环 `_run_agent_loop(...)`
|
||
|
||
这是第二个最核心的判断分支。
|
||
|
||
循环条件:
|
||
|
||
1. `iteration < self.max_iterations`
|
||
2. 每一轮都执行 `provider.chat(messages, tools, model, ...)`
|
||
|
||
分支判断:
|
||
|
||
1. `response.has_tool_calls == True`
|
||
- 如果有 `on_progress`:
|
||
- 先发清洗后的文本进度
|
||
- 再发工具提示 `_tool_hint(...)`
|
||
- 把 assistant 的 tool call 意图写入 messages
|
||
- 逐个执行工具
|
||
- 每个工具结果再写回 messages
|
||
- 回到下一轮继续问模型
|
||
|
||
2. `response.has_tool_calls == False`
|
||
- `final_content = response.content`
|
||
- break,循环结束
|
||
|
||
3. 超过最大轮数仍未收敛
|
||
- 使用兜底文本
|
||
- 也会把兜底回复追加进 messages
|
||
|
||
#### Step 9: 会话保存和最终返回
|
||
|
||
执行顺序:
|
||
|
||
1. `_save_turn(session, all_msgs, skip=1+len(history))`
|
||
- 把本轮新增 assistant/tool/final 消息写进 session
|
||
- 工具结果过长会截断
|
||
|
||
2. `sessions.save(session)`
|
||
- 持久化到 `<workspace>/sessions/*.jsonl`
|
||
|
||
3. `message_tool._sent_in_turn` 判断
|
||
- YES:
|
||
- 说明模型已经主动通过 `message` 工具把消息发出
|
||
- 为避免重复发,返回 `None`
|
||
- NO:
|
||
- 返回 `OutboundMessage(content=final_content)`
|
||
|
||
4. `process_direct()` 只取 `response.content`
|
||
- CLI 单轮模式最终直接 `console.print(...)`
|
||
|
||
|
||
## 2. CLI 交互模式:`nanobot agent`(无 `-m`)
|
||
|
||
这条链路和单轮模式最大的区别是:
|
||
|
||
1. 单轮模式直接 `process_direct()`
|
||
2. 交互模式走完整 `MessageBus` 工作流
|
||
|
||
### 2.1 分支判断
|
||
|
||
`agent()` 里判断条件很简单:
|
||
|
||
1. `if message:`
|
||
- 进入单轮模式
|
||
2. `else:`
|
||
- 进入交互模式
|
||
|
||
### 2.2 交互模式总览
|
||
|
||
```text
|
||
nanobot agent
|
||
│
|
||
├─ asyncio.create_task(agent_loop.run())
|
||
├─ asyncio.create_task(_consume_outbound())
|
||
│
|
||
├─ 用户输入一行
|
||
├─ bus.publish_inbound(InboundMessage(...))
|
||
│
|
||
├─ agent_loop.run()
|
||
│ ├─ bus.consume_inbound()
|
||
│ ├─ _process_message()
|
||
│ └─ bus.publish_outbound(response)
|
||
│
|
||
├─ _consume_outbound()
|
||
│ ├─ _progress 消息? -> 按配置打印中间态
|
||
│ ├─ 当前轮正式回复? -> 收集到 turn_response
|
||
│ └─ 轮外消息? -> 直接打印
|
||
│
|
||
└─ turn_done.set() -> 当前轮结束
|
||
```
|
||
|
||
### 2.3 为什么这里要走 bus
|
||
|
||
因为 CLI 交互模式想尽量模拟真实外部渠道的行为:
|
||
|
||
1. 用户输入先进入 `inbound`
|
||
2. Agent 常驻消费
|
||
3. 回复写入 `outbound`
|
||
4. CLI 再消费 `outbound`
|
||
|
||
这样本地就能复现:
|
||
|
||
1. 进度消息
|
||
2. 工具提示
|
||
3. 轮外异步通知
|
||
4. `message` 工具主动发消息时的行为差异
|
||
|
||
|
||
## 3. Gateway 常驻模式:`nanobot gateway`
|
||
|
||
`gateway` 是常驻服务入口,它把多种“事件来源”统一接入同一个 `AgentLoop`。
|
||
|
||
这些来源包括:
|
||
|
||
1. 外部聊天渠道
|
||
2. cron 定时任务
|
||
3. heartbeat 心跳任务
|
||
|
||
### 3.1 启动总览树
|
||
|
||
```text
|
||
nanobot gateway
|
||
│
|
||
├─ load_config()
|
||
├─ MessageBus()
|
||
├─ _make_provider(config)
|
||
├─ SessionManager(workspace)
|
||
├─ CronService(jobs.json)
|
||
├─ AgentLoop(...)
|
||
├─ cron.on_job = on_cron_job
|
||
├─ ChannelManager(config, bus)
|
||
├─ HeartbeatService(...)
|
||
│
|
||
└─ asyncio.run(run())
|
||
├─ await cron.start()
|
||
├─ await heartbeat.start()
|
||
└─ await asyncio.gather(
|
||
│ agent.run(),
|
||
│ channels.start_all(),
|
||
│ )
|
||
```
|
||
|
||
### 3.2 `gateway` 启动时的关键判断
|
||
|
||
#### 3.2.1 provider 选择
|
||
|
||
和 CLI 完全一样,仍由 `_make_provider(config)` 决定。
|
||
|
||
#### 3.2.2 `ChannelManager._init_channels()`
|
||
|
||
每个渠道都有一层配置判断:
|
||
|
||
1. `config.channels.telegram.enabled == True`
|
||
- 尝试实例化 `TelegramChannel`
|
||
- ImportError 只记 warning,不中断 gateway
|
||
|
||
2. 其他渠道同理:
|
||
- whatsapp
|
||
- discord
|
||
- feishu
|
||
- mochat
|
||
- dingtalk
|
||
- email
|
||
- slack
|
||
- qq
|
||
|
||
结果:
|
||
|
||
1. 启用且成功导入:放进 `self.channels`
|
||
2. 未启用:跳过
|
||
3. 缺依赖或初始化失败:warning,继续其他渠道
|
||
|
||
#### 3.2.3 `channels.start_all()`
|
||
|
||
这里还有一个重要判断:
|
||
|
||
1. `if not self.channels`
|
||
- 结果:warning `"No channels enabled"`,然后 return
|
||
- 跳转:`asyncio.gather()` 里只剩 `agent.run()` 常驻
|
||
|
||
2. 如果存在已启用渠道
|
||
- 先创建 `_dispatch_outbound()` 任务
|
||
- 再并发启动所有 `channel.start()`
|
||
|
||
|
||
## 4. Gateway 下的三种消息来源
|
||
|
||
### 4.1 来源 A:外部聊天渠道 -> bus -> agent -> outbound -> 渠道发送
|
||
|
||
这是最标准的生产链路。
|
||
|
||
#### 4.1.1 入站:渠道收到用户消息
|
||
|
||
每个渠道实现最终都会调用 `BaseChannel._handle_message(...)`。
|
||
|
||
判断顺序:
|
||
|
||
1. `is_allowed(sender_id)`?
|
||
- `allow_from` 为空:默认允许
|
||
- `sender_id` 完整匹配 allow list:允许
|
||
- `sender_id` 含 `|`,拆开任一部分匹配:允许
|
||
- 其他情况:拒绝
|
||
|
||
2. 判断结果:
|
||
- 允许:
|
||
- 构造 `InboundMessage`
|
||
- `await bus.publish_inbound(msg)`
|
||
- 拒绝:
|
||
- 只记 warning
|
||
- 消息被丢弃
|
||
|
||
#### 4.1.2 中段:`AgentLoop.run()`
|
||
|
||
`agent.run()` 会一直循环:
|
||
|
||
1. `await self.bus.consume_inbound()`,带 1 秒 timeout
|
||
2. 拿到消息后调用 `_process_message(msg)`
|
||
3. 判断返回值
|
||
|
||
返回值分支:
|
||
|
||
1. `response is not None`
|
||
- `await bus.publish_outbound(response)`
|
||
|
||
2. `response is None and msg.channel == "cli"`
|
||
- 发一个空 `OutboundMessage`
|
||
- 作用:通知 CLI 当前轮结束
|
||
|
||
3. `_process_message()` 抛异常
|
||
- 捕获异常
|
||
- 发一条 `Sorry, I encountered an error: ...`
|
||
|
||
#### 4.1.3 出站:`ChannelManager._dispatch_outbound()`
|
||
|
||
判断顺序:
|
||
|
||
1. `msg.metadata["_progress"] == True`?
|
||
- YES:
|
||
- 如果 `_tool_hint=True` 且 `send_tool_hints=False`:丢弃
|
||
- 如果 `_tool_hint=False` 且 `send_progress=False`:丢弃
|
||
- 否则继续发送
|
||
- NO:
|
||
- 直接进入正常路由
|
||
|
||
2. `self.channels.get(msg.channel)` 是否存在?
|
||
- YES:调用对应 `channel.send(msg)`
|
||
- NO:记录 warning `"Unknown channel"`
|
||
|
||
3. 单条发送失败?
|
||
- YES:只记 error,不终止整个 dispatcher
|
||
- NO:本条发送完成
|
||
|
||
|
||
### 4.2 来源 B:cron 定时任务
|
||
|
||
gateway 启动时,会先创建 `CronService`,再把 `cron.on_job` 绑定到 `run_cron_job(...)`。
|
||
|
||
也就是说,定时器本身不直接调用模型,而是统一交给 `run_cron_job()`。
|
||
|
||
#### 4.2.1 触发顺序
|
||
|
||
```text
|
||
cron.start()
|
||
│
|
||
├─ 计时器到点
|
||
├─ CronService 选出到期 job
|
||
├─ await on_job(job)
|
||
│ └─ run_cron_job(job, agent=agent, bus=bus, ...)
|
||
│
|
||
└─ 根据 job.payload.kind 分支执行
|
||
```
|
||
|
||
#### 4.2.2 `run_cron_job()` 的关键判断
|
||
|
||
1. `job.payload.kind == "system_event"`?
|
||
- YES:
|
||
- 直接把 `job.payload.message` 当结果
|
||
- 如果 `deliver=True` 且 `to` 非空:
|
||
- `bus.publish_outbound(...)`
|
||
- 不进入 `AgentLoop.process_direct()`
|
||
|
||
2. 否则视为 `agent_turn`
|
||
- 先解析 `session_key`
|
||
- 构造 `execution_context`
|
||
- 注入 `CronActionTool`
|
||
- 调用 `agent.process_direct(...)`
|
||
- 如果 `deliver=True` 且 `to` 非空:
|
||
- 再把最终结果发到 `outbound`
|
||
|
||
#### 4.2.3 `CronActionTool` 的结果分支
|
||
|
||
模型在 cron task 内可以调用 `cron_action(...)` 给出结构化决策:
|
||
|
||
1. `none`
|
||
2. `remove`
|
||
3. `disable`
|
||
4. `complete_today`
|
||
5. `reschedule`
|
||
|
||
后续由 `CronService` 读取 `CronExecutionResult.action` 决定如何处理任务的后续调度状态。
|
||
|
||
|
||
### 4.3 来源 C:heartbeat 心跳任务
|
||
|
||
heartbeat 的入口和 cron 不同,它不走 `bus.consume_inbound()`,而是直接调用 `agent.process_direct(...)`。
|
||
|
||
#### 4.3.1 `_pick_heartbeat_target()` 的判断
|
||
|
||
选择顺序:
|
||
|
||
1. 从 `session_manager.list_sessions()` 找最近活跃会话
|
||
2. 会话 key 必须能拆出 `channel:chat_id`
|
||
3. `channel` 不能是 `cli` 或 `system`
|
||
4. `channel` 必须在 `enabled_channels` 里
|
||
|
||
结果:
|
||
|
||
1. 找到可用外部会话:
|
||
- heartbeat 结果可以回到真实外部渠道
|
||
2. 找不到:
|
||
- 回退到 `cli:direct`
|
||
|
||
#### 4.3.2 heartbeat 执行链路
|
||
|
||
1. `on_heartbeat(prompt)`
|
||
- 直接 `agent.process_direct(...)`
|
||
- `session_key="heartbeat"`
|
||
- `on_progress=_silent`
|
||
- 不向外部渠道发送中间进度
|
||
|
||
2. `on_heartbeat_notify(response)`
|
||
- 如果目标仍是 `cli`:不投递
|
||
- 否则:`bus.publish_outbound(...)`
|
||
|
||
|
||
## 5. Web 前端:当前代码真实可执行的链路
|
||
|
||
这里要分成两个概念:
|
||
|
||
1. 当前 CLI 能直接启动的 Web 后端:`nanobot web`
|
||
2. `create_app()` 代码里预留的 gateway mode:`bus + web_channel`
|
||
|
||
先说已经真实落地的 `nanobot web`。
|
||
|
||
|
||
## 6. `nanobot web`:standalone Web 后端
|
||
|
||
`nanobot web` 会调用:
|
||
|
||
```text
|
||
create_app(config=config)
|
||
```
|
||
|
||
这里没有传 `bus`,所以会进入 standalone 分支。
|
||
|
||
### 6.1 `create_app()` 的第一层判断
|
||
|
||
判断条件:
|
||
|
||
1. `if bus is None`
|
||
- YES:standalone mode
|
||
- NO:gateway mode
|
||
|
||
当前 `nanobot web` 的结果一定是:
|
||
|
||
1. 创建本地 `MessageBus`
|
||
2. 创建本地 `provider`
|
||
3. 创建本地 `SessionManager`
|
||
4. 创建本地 `CronService`
|
||
5. 创建本地 `AgentLoop`
|
||
6. `app.state.agent = agent`
|
||
7. `app.state.web_channel = None`
|
||
|
||
也就是说,Web 请求会直接落到本地 `AgentLoop.process_direct(...)`,而不是先发到 bus 再异步回传。
|
||
|
||
|
||
## 7. Standalone Web 下的三条前端入口
|
||
|
||
### 7.1 HTTP:`POST /api/chat`
|
||
|
||
前端发送:
|
||
|
||
```json
|
||
{
|
||
"message": "你好",
|
||
"session_id": "web:default",
|
||
"attachments": []
|
||
}
|
||
```
|
||
|
||
执行顺序:
|
||
|
||
1. `_resolve_attachment_paths(...)`
|
||
- 有 `attachments` 才解析本地文件路径
|
||
- 没有则返回空列表
|
||
|
||
2. 计算 `chat_id`
|
||
- `session_id` 包含 `:`:
|
||
- 取冒号后半部分
|
||
- 例如 `web:default -> default`
|
||
- 否则直接用整个 `session_id`
|
||
|
||
3. 判断 `web_channel is not None`?
|
||
- standalone 下固定是 `None`
|
||
- 所以会走 fallback 分支
|
||
|
||
4. fallback 分支:
|
||
- `agent.process_direct(...)`
|
||
- `channel="web"`
|
||
- `chat_id=解析后的 chat_id`
|
||
- `session_key=session_id`
|
||
|
||
5. 返回:
|
||
- `ChatResponse(response=..., session_id=...)`
|
||
|
||
特点:
|
||
|
||
1. 同步等待模型完整完成
|
||
2. HTTP 请求返回时已经拿到最终答案
|
||
3. 不返回实时中间进度
|
||
|
||
|
||
### 7.2 SSE:`POST /api/chat/stream`
|
||
|
||
这条链路只允许 standalone 使用。
|
||
|
||
判断条件:
|
||
|
||
1. `agent = app.state.agent`
|
||
2. `if agent is None`
|
||
- YES:说明当前是 gateway mode
|
||
- 结果:抛 400,提示 `"Streaming not available in gateway mode. Use WebSocket."`
|
||
- 跳转结束
|
||
|
||
3. `agent is not None`
|
||
- 进入 standalone SSE 分支
|
||
|
||
执行顺序:
|
||
|
||
1. 先 `yield {"type":"start"}`
|
||
2. 调用 `agent.process_direct(...)`
|
||
3. 拿到完整文本后按 20 字符一段切块
|
||
4. 按顺序 `yield {"type":"content","content": chunk}`
|
||
5. 结束时 `yield {"type":"done"}`
|
||
|
||
注意:
|
||
|
||
1. 这里不是“真正 token 级流式”
|
||
2. 而是“先拿完整答案,再假流式切块回放”
|
||
|
||
|
||
### 7.3 WebSocket:`/ws/{session_id}`
|
||
|
||
这条链路是当前 Web 前端最复杂的一条,因为它同时支持:
|
||
|
||
1. ping/pong
|
||
2. 取消委派
|
||
3. 普通消息
|
||
4. 直连模式下的结构化过程事件
|
||
|
||
#### 7.3.1 首层判断
|
||
|
||
连接建立后:
|
||
|
||
1. `await websocket.accept()`
|
||
2. `send_lock = asyncio.Lock()`
|
||
3. 判断 `web_channel is not None`
|
||
|
||
结果:
|
||
|
||
1. gateway mode:
|
||
- `web_channel.register_connection(session_id, websocket)`
|
||
2. standalone mode:
|
||
- 不注册外部 channel
|
||
- 后续直接在当前 handler 内调用 `agent.process_direct()`
|
||
|
||
#### 7.3.2 收到客户端消息后的判断
|
||
|
||
每次 `await websocket.receive_text()` 后,会按以下顺序判断:
|
||
|
||
1. JSON 可解析?
|
||
- NO:忽略,继续下一条
|
||
- YES:继续判断 `type`
|
||
|
||
2. `type == "ping"`?
|
||
- YES:立刻回 `{"type":"pong"}`
|
||
- NO:继续
|
||
|
||
3. `type == "cancel_process"`?
|
||
- YES:
|
||
- 取 `run_id`
|
||
- 判断当前是否有本地 `agent`
|
||
- 如果有:`await agent.delegation.cancel(run_id)`
|
||
- 返回 `{"type":"process_cancel_ack","run_id":...,"ok":...}`
|
||
- NO:继续
|
||
|
||
4. `type == "message"`?
|
||
- YES:进入消息处理分支
|
||
- NO:忽略
|
||
|
||
#### 7.3.3 standalone WebSocket 消息链路
|
||
|
||
当 `web_channel is None` 时:
|
||
|
||
1. 先回 `{"type":"status","status":"thinking"}`
|
||
2. 定义 `_process_sink(event)`:
|
||
- 把 `process_event_callback` 推来的结构化事件直接发给前端
|
||
- 自动补上 `session_id`
|
||
|
||
3. 调用:
|
||
|
||
```text
|
||
agent.process_direct(
|
||
...,
|
||
process_event_callback=_process_sink,
|
||
)
|
||
```
|
||
|
||
4. 执行过程中,前端会陆续收到:
|
||
- `process_run_started`
|
||
- `process_run_progress`
|
||
- `process_run_status`
|
||
- `process_run_artifact`
|
||
- `process_run_finished`
|
||
- 以及可能的最终 `message`
|
||
|
||
5. 最终再显式回:
|
||
|
||
```json
|
||
{
|
||
"type": "message",
|
||
"role": "assistant",
|
||
"content": "..."
|
||
}
|
||
```
|
||
|
||
这个模式的优势是:
|
||
|
||
1. 前端可以看到多 agent / MCP / A2A 的中间态树状过程
|
||
2. 还可以在拿到 `run_id` 后主动发 `cancel_process`
|
||
|
||
|
||
## 8. `create_app()` 预留的 gateway mode:前端如何接到 bus
|
||
|
||
这一部分是你特别关心的“前端 + gateway”链路,但要先说明一个事实:
|
||
|
||
当前仓库的 `create_app()` 已经写好了 gateway mode 的判断分支,但现有 CLI 命令没有直接把一个具体的 `web_channel` 实例传进去。
|
||
|
||
所以这一节描述的是:
|
||
|
||
1. 代码里已经定义好的分支规则
|
||
2. 如果调用方把 `bus` 和 `web_channel` 注入进来,会怎么跳转
|
||
|
||
### 8.1 gateway mode 进入条件
|
||
|
||
`create_app(...)` 的判断条件是:
|
||
|
||
1. `bus is None`
|
||
- NO:说明调用方已经提供了总线
|
||
2. 同时还可以提供 `web_channel`
|
||
|
||
结果:
|
||
|
||
1. `app.state.agent = None`
|
||
2. `app.state.web_channel = web_channel`
|
||
3. Web API 本身不创建本地 `AgentLoop`
|
||
4. 所有前端消息都应该转发给 `web_channel` / `bus`
|
||
|
||
|
||
## 9. Gateway mode 下前端的不同链路
|
||
|
||
### 9.1 HTTP:`POST /api/chat`
|
||
|
||
判断:
|
||
|
||
1. `web_channel is not None`
|
||
- YES:gateway 分支
|
||
- NO:standalone fallback
|
||
|
||
gateway 分支执行顺序:
|
||
|
||
1. `web_channel._handle_message(...)`
|
||
- 把前端消息包装成 `InboundMessage`
|
||
- 再发布到 `bus.inbound`
|
||
|
||
2. `await web_channel.notify_thinking(chat_id)`
|
||
- 立即通知前端进入 thinking 状态
|
||
|
||
3. 立刻返回:
|
||
|
||
```json
|
||
{
|
||
"status": "accepted",
|
||
"session_id": "..."
|
||
}
|
||
```
|
||
|
||
这意味着:
|
||
|
||
1. HTTP 这里不等待 LLM 完成
|
||
2. 真正结果要靠后续 WebSocket 或 `web_channel` 自己的回推机制返回给前端
|
||
|
||
|
||
### 9.2 SSE:`POST /api/chat/stream`
|
||
|
||
在 gateway mode 下,这条路会被显式禁止。
|
||
|
||
判断条件:
|
||
|
||
1. `agent is None`
|
||
- YES:说明当前是 gateway mode
|
||
- 结果:抛 400
|
||
- 原因:gateway mode 不在当前进程里直接跑 `process_direct()`,所以这里没法同步拉一条 SSE 直流
|
||
|
||
|
||
### 9.3 WebSocket:`/ws/{session_id}`
|
||
|
||
在 gateway mode 下,收到 `type="message"` 后会走:
|
||
|
||
1. `web_channel.register_connection(session_id, websocket)`
|
||
2. `web_channel._handle_message(...)`
|
||
3. `web_channel.notify_thinking(session_id)`
|
||
|
||
然后消息进入:
|
||
|
||
```text
|
||
前端 WebSocket
|
||
-> web server websocket handler
|
||
-> web_channel._handle_message(...)
|
||
-> bus.publish_inbound(...)
|
||
-> agent.run()
|
||
-> _process_message()
|
||
-> bus.publish_outbound(...)
|
||
-> web_channel / outbound consumer
|
||
-> websocket.send_text(...)
|
||
```
|
||
|
||
### 9.4 这里真正的“判断 + 跳转”关系
|
||
|
||
前端发一条 WebSocket 消息时,handler 的判断顺序是:
|
||
|
||
1. `type == "ping"`?
|
||
- YES:直接 `pong`
|
||
- NO:继续
|
||
|
||
2. `type == "cancel_process"`?
|
||
- YES:
|
||
- 如果当前有本地 `agent`,则走 `agent.delegation.cancel(run_id)`
|
||
- 如果当前没有本地 `agent`,`ok` 会是 `false`
|
||
- NO:继续
|
||
|
||
3. `type == "message"`?
|
||
- YES:继续
|
||
- NO:忽略
|
||
|
||
4. `web_channel is not None`?
|
||
- YES:gateway 分支
|
||
- `_handle_message(...)`
|
||
- `notify_thinking(...)`
|
||
- 等待异步回推
|
||
- NO:standalone 分支
|
||
- 当前协程内直接 `process_direct(...)`
|
||
- 当场把过程事件和最终答案发回客户端
|
||
|
||
|
||
## 10. 前端 + gateway 这件事,当前代码的真实状态
|
||
|
||
这一点必须明确写清楚,避免 workflow 文档误导:
|
||
|
||
1. 当前仓库中,`create_app()` 已经支持 “传入 `bus` + `web_channel`” 的 gateway mode。
|
||
2. 但是当前 CLI 命令里:
|
||
- `nanobot gateway` 启动的是常驻渠道服务
|
||
- `nanobot web` 启动的是 standalone FastAPI
|
||
3. 也就是说,仓库当前“直接可运行”的默认前端后端链路,其实是 `nanobot web` 的 standalone 模式。
|
||
4. “gateway + Web 前端共用同一 bus/web_channel” 目前在代码层属于预留集成点,而不是现成的一条 CLI 启动链。
|
||
|
||
如果未来要把这条链路真正跑起来,最少需要:
|
||
|
||
1. 在某个入口里创建共享的 `MessageBus`
|
||
2. 创建 `AgentLoop`
|
||
3. 创建具体 `WebChannel` 实现
|
||
4. 把 `bus` 和 `web_channel` 传给 `create_app(...)`
|
||
5. 同时启动:
|
||
- `agent.run()`
|
||
- Web server
|
||
- 以及 `web_channel` 对 outbound 的回推逻辑
|
||
|
||
|
||
## 11. 一页版总结
|
||
|
||
### 11.1 `nanobot agent -m "你好"`
|
||
|
||
1. 直接 `process_direct()`
|
||
2. 不走常驻 `agent.run()`
|
||
3. 不走 inbound/outbound 总线消费循环
|
||
4. 同步拿最终答案后打印退出
|
||
|
||
### 11.2 `nanobot agent` 交互模式
|
||
|
||
1. 启动 `agent.run()`
|
||
2. CLI 自己把输入写入 `bus.inbound`
|
||
3. 再从 `bus.outbound` 取结果显示
|
||
|
||
### 11.3 `nanobot gateway`
|
||
|
||
1. 常驻启动 `agent.run()`
|
||
2. 渠道、cron、heartbeat 都是消息生产者
|
||
3. 最终统一回到 `AgentLoop._process_message()`
|
||
4. 回答再由 `ChannelManager` 按 `msg.channel` 分发出去
|
||
|
||
### 11.4 `nanobot web`
|
||
|
||
1. 当前默认是 standalone
|
||
2. `/api/chat` 和 `/ws` 直接调用本地 `agent.process_direct()`
|
||
3. `/api/chat/stream` 只在 standalone 可用
|
||
|
||
### 11.5 `create_app()` 的 gateway mode
|
||
|
||
1. 判断条件是 `bus is not None` 且通常还会有 `web_channel`
|
||
2. Web 请求不直接跑本地 agent
|
||
3. 而是把前端消息丢进 bus,再由外部运行中的 `agent.run()` 处理
|
||
4. 当前仓库有这个分支,但 CLI 默认还没把它作为现成启动方式接起来
|