feat(mcp): 增强MCP服务器异常处理和错误描述
- 添加_iter_leaf_exceptions函数用于处理嵌套异常组 - 实现_describe_mcp_exception函数提供详细的MCP服务器错误信息 - 改进connect_mcp_servers中的错误处理,使用更精确的错误描述 - 在日志记录中包含更具体的错误详情 feat(outlook): 优化Outlook集成异常处理 - 添加_iter_leaf_exceptions函数用于异常处理 - 创建_coerce_outlook_mcp_exception函数统一异常转换 - 改进_call_outlook_mcp_tool中的异常捕获和处理 - 对认证令牌获取和MCP调用添加专门的超时和HTTP错误处理 feat(web): 改进Web会话定时任务结果记录 - 实现_record_cron_result_for_web_session函数 - 为Web模式下的独立定时任务执行结果提供持久化存储 - 支持将定时任务响应消息添加到目标会话中 - 确保前端可以显示定时任务执行结果
This commit is contained in:
@ -19,6 +19,37 @@ from nanobot.agent.tools.base import Tool
|
||||
from nanobot.agent.tools.registry import ToolRegistry
|
||||
|
||||
|
||||
def _iter_leaf_exceptions(exc: BaseException) -> list[BaseException]:
|
||||
if isinstance(exc, BaseExceptionGroup):
|
||||
leaves: list[BaseException] = []
|
||||
for sub_exc in exc.exceptions:
|
||||
leaves.extend(_iter_leaf_exceptions(sub_exc))
|
||||
return leaves
|
||||
return [exc]
|
||||
|
||||
|
||||
def _describe_mcp_exception(exc: BaseException, *, server_name: str, url: str | None = None) -> str:
|
||||
leaves = _iter_leaf_exceptions(exc)
|
||||
target = f" ({url})" if url else ""
|
||||
|
||||
for leaf in leaves:
|
||||
if isinstance(leaf, httpx.TimeoutException):
|
||||
return f"MCP server '{server_name}' timed out while waiting for a response{target}"
|
||||
if isinstance(leaf, httpx.ConnectError):
|
||||
return f"MCP server '{server_name}' is unreachable{target}"
|
||||
if isinstance(leaf, httpx.HTTPStatusError):
|
||||
return f"MCP server '{server_name}' returned HTTP {leaf.response.status_code}{target}"
|
||||
if isinstance(leaf, httpx.HTTPError):
|
||||
detail = str(leaf).strip() or leaf.__class__.__name__
|
||||
return f"MCP server '{server_name}' HTTP error{target}: {detail}"
|
||||
|
||||
detail_source = leaves[0] if leaves else exc
|
||||
detail = str(detail_source).strip() or detail_source.__class__.__name__
|
||||
if isinstance(exc, BaseExceptionGroup):
|
||||
return f"MCP server '{server_name}' failed: {detail_source.__class__.__name__}: {detail}"
|
||||
return detail
|
||||
|
||||
|
||||
class MCPToolWrapper(Tool):
|
||||
"""把单个 MCP server tool 包装成 nanobot Tool。"""
|
||||
|
||||
@ -340,7 +371,12 @@ async def connect_mcp_servers(
|
||||
)
|
||||
except Exception as e:
|
||||
# 单个 server 失败不影响其他 server 继续连;错误写进 report 供 UI 展示。
|
||||
error_detail = _describe_mcp_exception(
|
||||
e,
|
||||
server_name=name,
|
||||
url=str(getattr(cfg, "url", "") or "").strip() or None,
|
||||
)
|
||||
report[name]["status"] = "error"
|
||||
report[name]["last_error"] = str(e)
|
||||
logger.error("MCP server '{}': failed to connect: {}", name, e)
|
||||
report[name]["last_error"] = error_detail
|
||||
logger.error("MCP server '{}': failed to connect: {}", name, error_detail)
|
||||
return report
|
||||
|
||||
@ -31,6 +31,38 @@ class OutlookIntegrationError(RuntimeError):
|
||||
"""Raised when the Outlook integration backend is unavailable or misconfigured."""
|
||||
|
||||
|
||||
def _iter_leaf_exceptions(exc: BaseException) -> list[BaseException]:
|
||||
if isinstance(exc, BaseExceptionGroup):
|
||||
leaves: list[BaseException] = []
|
||||
for sub_exc in exc.exceptions:
|
||||
leaves.extend(_iter_leaf_exceptions(sub_exc))
|
||||
return leaves
|
||||
return [exc]
|
||||
|
||||
|
||||
def _coerce_outlook_mcp_exception(exc: BaseException, *, url: str) -> OutlookIntegrationError:
|
||||
if isinstance(exc, OutlookIntegrationError):
|
||||
return exc
|
||||
|
||||
leaves = _iter_leaf_exceptions(exc)
|
||||
for leaf in leaves:
|
||||
if isinstance(leaf, httpx.TimeoutException):
|
||||
return OutlookIntegrationError(f"Outlook MCP 请求超时:{url}")
|
||||
if isinstance(leaf, httpx.ConnectError):
|
||||
return OutlookIntegrationError(f"Outlook MCP 无法连接:{url}")
|
||||
if isinstance(leaf, httpx.HTTPStatusError):
|
||||
return OutlookIntegrationError(f"Outlook MCP 返回 HTTP {leaf.response.status_code}:{url}")
|
||||
if isinstance(leaf, httpx.HTTPError):
|
||||
detail = str(leaf).strip() or leaf.__class__.__name__
|
||||
return OutlookIntegrationError(f"Outlook MCP 网络错误:{detail}")
|
||||
|
||||
detail_source = leaves[0] if leaves else exc
|
||||
detail = str(detail_source).strip() or detail_source.__class__.__name__
|
||||
return OutlookIntegrationError(
|
||||
f"Outlook MCP 调用失败:{detail_source.__class__.__name__}: {detail}"
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OutlookDefaults:
|
||||
"""Default values exposed to the web setup form."""
|
||||
@ -187,32 +219,43 @@ async def _call_outlook_mcp_tool(
|
||||
from mcp import ClientSession, types
|
||||
from mcp.client.streamable_http import streamable_http_client
|
||||
|
||||
url = _outlook_mcp_url(config)
|
||||
backend_id = _require_backend_identity(config)
|
||||
client = _authz_client(config)
|
||||
token_response = await client.issue_token(
|
||||
client_id=config.backend_identity.client_id,
|
||||
client_secret=config.backend_identity.client_secret,
|
||||
audience=f"mcp:{OUTLOOK_SERVER_ID}",
|
||||
scopes=scopes or ["list_tools", f"tool:{tool_name}"],
|
||||
)
|
||||
try:
|
||||
token_response = await client.issue_token(
|
||||
client_id=config.backend_identity.client_id,
|
||||
client_secret=config.backend_identity.client_secret,
|
||||
audience=f"mcp:{OUTLOOK_SERVER_ID}",
|
||||
scopes=scopes or ["list_tools", f"tool:{tool_name}"],
|
||||
)
|
||||
except httpx.TimeoutException as exc:
|
||||
raise OutlookIntegrationError("AuthZ token 请求超时。") from exc
|
||||
except httpx.HTTPError as exc:
|
||||
detail = str(exc).strip() or exc.__class__.__name__
|
||||
raise OutlookIntegrationError(f"AuthZ token 获取失败:{detail}") from exc
|
||||
|
||||
access_token = str(token_response.get("access_token") or "").strip()
|
||||
if not access_token:
|
||||
raise OutlookIntegrationError("Failed to obtain an Outlook MCP access token.")
|
||||
|
||||
async with AsyncExitStack() as stack:
|
||||
http_client = await stack.enter_async_context(
|
||||
httpx.AsyncClient(
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
follow_redirects=True,
|
||||
trust_env=False,
|
||||
try:
|
||||
async with AsyncExitStack() as stack:
|
||||
http_client = await stack.enter_async_context(
|
||||
httpx.AsyncClient(
|
||||
headers={"Authorization": f"Bearer {access_token}"},
|
||||
follow_redirects=True,
|
||||
trust_env=False,
|
||||
)
|
||||
)
|
||||
)
|
||||
read, write, _ = await stack.enter_async_context(
|
||||
streamable_http_client(_outlook_mcp_url(config), http_client=http_client)
|
||||
)
|
||||
session = await stack.enter_async_context(ClientSession(read, write))
|
||||
await session.initialize()
|
||||
result = await session.call_tool(tool_name, arguments=arguments)
|
||||
read, write, _ = await stack.enter_async_context(
|
||||
streamable_http_client(url, http_client=http_client)
|
||||
)
|
||||
session = await stack.enter_async_context(ClientSession(read, write))
|
||||
await session.initialize()
|
||||
result = await session.call_tool(tool_name, arguments=arguments)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
raise _coerce_outlook_mcp_exception(exc, url=url) from exc
|
||||
|
||||
parts: list[str] = []
|
||||
for block in result.content:
|
||||
|
||||
@ -540,6 +540,40 @@ def _infer_cron_route_from_session_key(session_key: str | None) -> tuple[str | N
|
||||
return channel, chat_id
|
||||
|
||||
|
||||
def _record_cron_result_for_web_session(
|
||||
*,
|
||||
session_manager: SessionManager,
|
||||
job: CronJob,
|
||||
result: CronExecutionResult,
|
||||
) -> str | None:
|
||||
"""Persist standalone web cron output so the frontend can surface it."""
|
||||
target_session_key = _resolve_cron_session_key(job)
|
||||
if not target_session_key.startswith("web:"):
|
||||
return None
|
||||
|
||||
# agent_turn jobs already write their own history via AgentLoop.process_direct().
|
||||
if job.payload.kind == "agent_turn":
|
||||
return target_session_key
|
||||
|
||||
# reminder/system_event jobs bypass the agent loop, so standalone web mode
|
||||
# must append the final message into the target session explicitly.
|
||||
if job.payload.kind != "system_event" or not job.payload.deliver or not result.response:
|
||||
return None
|
||||
|
||||
session = session_manager.get_or_create(target_session_key)
|
||||
session.add_message(
|
||||
"assistant",
|
||||
result.response,
|
||||
metadata={
|
||||
"source": "cron",
|
||||
"job_id": job.id,
|
||||
"job_name": job.name,
|
||||
},
|
||||
)
|
||||
session_manager.save(session)
|
||||
return target_session_key
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# App factory
|
||||
# ============================================================================
|
||||
@ -635,8 +669,12 @@ def create_app(
|
||||
default_channel="web",
|
||||
default_chat_id="default",
|
||||
)
|
||||
target_session_key = _resolve_cron_session_key(job)
|
||||
if job.payload.kind == "agent_turn" and target_session_key.startswith("web:"):
|
||||
target_session_key = _record_cron_result_for_web_session(
|
||||
session_manager=session_manager,
|
||||
job=job,
|
||||
result=result,
|
||||
)
|
||||
if target_session_key:
|
||||
await websocket_broadcaster.broadcast({
|
||||
"type": "session_updated",
|
||||
"session_id": target_session_key,
|
||||
|
||||
Reference in New Issue
Block a user