"""结构化过程事件辅助工具。 这个模块的作用是把“运行中的中间状态”从底层执行逻辑安全地带到上层 UI: 1. 用 `ContextVar` 记录当前异步上下文是否挂了事件 sink; 2. 用单独的 run_id 上下文把父子流程串起来; 3. 让委派、MCP、A2A 等模块只管发事件,不需要知道 WebSocket/SSE 细节。 """ from __future__ import annotations import uuid from contextlib import contextmanager from contextvars import ContextVar from datetime import datetime, timezone from typing import Any, Awaitable, Callable ProcessEvent = dict[str, Any] ProcessEventSink = Callable[[ProcessEvent], Awaitable[None]] # `_sink_var` 保存“当前异步上下文的事件接收器”。 # 这样可以避免把回调一层层显式往下传,同时又不会污染并发请求之间的上下文。 _sink_var: ContextVar[ProcessEventSink | None] = ContextVar("process_event_sink", default=None) # `_run_id_var` 保存“当前流程的父 run_id”。 # 子流程发事件时可以把它挂到 `parent_run_id`,供前端拼接树状执行视图。 _run_id_var: ContextVar[str | None] = ContextVar("process_current_run_id", default=None) def new_run_id(prefix: str = "run") -> str: """生成一个短且可读的运行 ID。""" # 只截取 8 位十六进制是为了兼顾: # 1. 日志 / WebSocket 里更短、更容易肉眼追踪; # 2. 同一进程内短期冲突概率仍足够低。 return f"{prefix}-{uuid.uuid4().hex[:8]}" def utc_now_iso() -> str: """返回带 `Z` 后缀的 UTC ISO8601 时间戳。""" return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") @contextmanager def process_event_sink(sink: ProcessEventSink | None): """为当前异步上下文临时绑定一个事件 sink。""" # `ContextVar.set()` 会返回 token,退出时要 reset,避免泄漏到后续请求。 token = _sink_var.set(sink) try: yield finally: _sink_var.reset(token) @contextmanager def process_run_context(run_id: str | None): """为当前异步上下文绑定一个逻辑父 run_id。""" token = _run_id_var.set(run_id) try: yield finally: _run_id_var.reset(token) def current_process_run_id() -> str | None: """读取当前上下文里绑定的 run_id。""" return _run_id_var.get() def has_process_event_sink() -> bool: """判断当前上下文是否具备过程事件接收能力。""" return _sink_var.get() is not None async def emit_process_event(event_type: str, **payload: Any) -> None: """在存在 sink 时发出一个结构化过程事件。""" sink = _sink_var.get() # 没有 sink 说明当前调用链不关心中间态,例如纯 CLI 单轮场景,直接静默跳过。 if sink is None: return # `created_at` 允许调用方覆盖;未传时统一补 UTC 时间,方便前端排序。 event: ProcessEvent = { "type": event_type, "created_at": payload.pop("created_at", utc_now_iso()), **payload, } await sink(event)