"""Projection of hidden Task/team events into frontend process streams.""" from __future__ import annotations from datetime import datetime, timezone from typing import Any class SessionProcessProjector: def __init__(self, session_manager: Any, run_memory_store: Any) -> None: self.session_manager = session_manager self.run_memory_store = run_memory_store def project(self, session_id: str) -> dict[str, Any]: records = self.session_manager.get_event_records(session_id) run_records = {record.run_id: record for record in self.run_memory_store.list_runs()} runs: dict[str, dict[str, Any]] = {} events: list[dict[str, Any]] = [] artifacts: list[dict[str, Any]] = [] def add_event( *, event_id: str, run_id: str, kind: str, actor_type: str, actor_id: str, actor_name: str, text: str, created_at: str, status: str | None = None, parent_run_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> None: events.append( { "event_id": event_id, "run_id": run_id, "parent_run_id": parent_run_id, "kind": kind, "actor_type": actor_type, "actor_id": actor_id, "actor_name": actor_name, "text": text, "status": status, "metadata": dict(metadata or {}), "created_at": created_at, } ) for record in records: payload = dict(record.event_payload or {}) task_id = payload.get("task_id") if not task_id: continue attempt_index = int(payload.get("attempt_index") or 1) root_run_id = f"task:{task_id}:attempt:{attempt_index}" created_at = _timestamp(record.timestamp) root = runs.setdefault( root_run_id, { "run_id": root_run_id, "parent_run_id": None, "session_id": session_id, "actor_type": "system", "actor_id": "task", "actor_name": "Task Planner", "title": f"Task {task_id[:8]} attempt {attempt_index}", "source": "task_mode", "status": "running", "started_at": created_at, "metadata": {"task_id": task_id, "attempt_index": attempt_index}, }, ) if record.event_type == "task_execution_planned": strategy = payload.get("strategy") or "single" node_ids = payload.get("node_ids") or [] root["title"] = f"{payload.get('plan_mode', 'single')} plan: {strategy}" root["summary"] = payload.get("reason") or "" root["metadata"] = { **root.get("metadata", {}), "plan_mode": payload.get("plan_mode"), "strategy": payload.get("strategy"), "node_ids": node_ids, "skill_queries": payload.get("skill_queries") or [], "selected_skill_names": payload.get("selected_skill_names") or [], "ephemeral_guidance_ids": payload.get("ephemeral_guidance_ids") or [], "skill_resolution_report": payload.get("skill_resolution_report") or [], "fallback_error": payload.get("fallback_error"), } add_event( event_id=_event_id(record, "planned"), run_id=root_run_id, kind="run_started", actor_type="system", actor_id="task", actor_name="Task Planner", text=f"Planned {payload.get('plan_mode')} execution via {strategy}. {payload.get('reason') or ''}".strip(), created_at=created_at, status="running", metadata=root["metadata"], ) elif record.event_type in {"task_team_run_completed", "task_team_run_failed"}: team_success = bool(payload.get("team_success")) root["status"] = "running" root["metadata"] = { **root.get("metadata", {}), "team_success": team_success, "team_run_ids": payload.get("team_run_ids") or [], "team_error": payload.get("error"), } add_event( event_id=_event_id(record, "team"), run_id=root_run_id, kind="run_status", actor_type="system", actor_id="team", actor_name="Task Team", text=payload.get("error") or ("Team completed" if team_success else "Team completed with failed nodes"), created_at=created_at, status="done" if team_success else "error", metadata=dict(payload), ) node_results = payload.get("node_results") or [] for item in node_results: if not isinstance(item, dict): continue node_run_id = item.get("run_id") or f"{root_run_id}:node:{item.get('node_id')}" status = "done" if item.get("success") else "error" if item.get("finish_reason") == "blocked": status = "waiting" run_record = run_records.get(str(node_run_id)) runs[str(node_run_id)] = { "run_id": str(node_run_id), "parent_run_id": root_run_id, "session_id": run_record.session_id if run_record is not None else session_id, "actor_type": "agent", "actor_id": str(item.get("node_id") or "sub-agent"), "actor_name": str(item.get("node_id") or "Sub-agent"), "title": str(item.get("node_id") or "Sub-agent"), "source": "task_team", "status": status, "started_at": run_record.started_at if run_record is not None else created_at, "finished_at": run_record.ended_at if run_record is not None else created_at, "summary": _truncate(str(item.get("output_text") or item.get("error") or "")), "metadata": { "task_id": task_id, "attempt_index": attempt_index, "node_id": item.get("node_id"), "skill_query": item.get("skill_query"), "selected_skill_names": item.get("selected_skill_names") or [], "ephemeral_skill_names": item.get("ephemeral_skill_names") or [], "ephemeral_guidance_id": item.get("ephemeral_guidance_id"), "ephemeral_guidance_name": item.get("ephemeral_guidance_name"), "ephemeral_used": bool(item.get("ephemeral_used")), "finish_reason": item.get("finish_reason"), "error": item.get("error"), }, } guidance_id = item.get("ephemeral_guidance_id") if guidance_id: guidance_name = str(item.get("ephemeral_guidance_name") or guidance_id) artifacts.append( { "artifact_id": f"{node_run_id}:ephemeral-guidance:{guidance_id}", "run_id": str(node_run_id), "actor_type": "agent", "actor_id": str(item.get("node_id") or "sub-agent"), "actor_name": str(item.get("node_id") or "Sub-agent"), "title": f"Ephemeral guidance: {guidance_name}", "artifact_type": "markdown", "content": ( f"# Ephemeral guidance\n\n" f"- Guidance: {guidance_name}\n" f"- Guidance ID: {guidance_id}\n" f"- Scope: current delegated sub-agent run only" ), "metadata": { "task_id": task_id, "attempt_index": attempt_index, "node_id": item.get("node_id"), "ephemeral_guidance_id": guidance_id, "ephemeral_guidance_name": guidance_name, "ephemeral_skill_names": item.get("ephemeral_skill_names") or [], }, "created_at": created_at, } ) add_event( event_id=f"{_event_id(record, 'node')}:{item.get('node_id')}", run_id=str(node_run_id), parent_run_id=root_run_id, kind="run_finished", actor_type="agent", actor_id=str(item.get("node_id") or "sub-agent"), actor_name=str(item.get("node_id") or "Sub-agent"), text=_truncate(str(item.get("output_text") or item.get("error") or "")), created_at=created_at, status=status, metadata=dict(item), ) elif record.event_type == "task_synthesis_completed": main_run_id = str(payload.get("main_run_id") or "") if main_run_id: run_record = run_records.get(main_run_id) runs[main_run_id] = { "run_id": main_run_id, "parent_run_id": root_run_id, "session_id": run_record.session_id if run_record is not None else session_id, "actor_type": "agent", "actor_id": "main-agent", "actor_name": "Main Agent", "title": "Final synthesis", "source": "task_synthesis", "status": "done" if (run_record is None or run_record.success) else "error", "started_at": run_record.started_at if run_record is not None else created_at, "finished_at": run_record.ended_at if run_record is not None else created_at, "summary": _truncate(run_record.task_text if run_record is not None else ""), "metadata": {"task_id": task_id, "attempt_index": attempt_index}, } add_event( event_id=_event_id(record, "synthesis"), run_id=main_run_id, parent_run_id=root_run_id, kind="run_finished", actor_type="agent", actor_id="main-agent", actor_name="Main Agent", text="Main Agent synthesized the final user-facing answer.", created_at=created_at, status="done", metadata=dict(payload), ) elif record.event_type == "task_validation_snapshotted": validation = payload.get("validation_result") if isinstance(payload.get("validation_result"), dict) else {} accepted = bool(validation.get("accepted")) root["status"] = "done" if accepted or attempt_index == 2 else "waiting" root["finished_at"] = created_at if root["status"] == "done" else None add_event( event_id=_event_id(record, "validation"), run_id=record.run_id or root_run_id, parent_run_id=root_run_id if record.run_id else None, kind="run_status", actor_type="system", actor_id="validator", actor_name="Validator", text=( f"Validation {'passed' if accepted else 'failed'} " f"(score={validation.get('score')})." + (" Retry scheduled." if payload.get("retry_scheduled") else "") ), created_at=created_at, status="done" if accepted else "error", metadata=dict(payload), ) return { "runs": sorted(runs.values(), key=lambda item: item.get("started_at") or ""), "events": sorted(events, key=lambda item: item.get("created_at") or ""), "artifacts": sorted(artifacts, key=lambda item: item.get("created_at") or ""), "agents": [], } def _timestamp(value: float | None) -> str: if value is None: return datetime.now(timezone.utc).isoformat() return datetime.fromtimestamp(float(value), tz=timezone.utc).isoformat() def _event_id(record: Any, suffix: str) -> str: return f"session-event:{record.message_id or record.timestamp}:{suffix}" def _truncate(text: str, limit: int = 800) -> str: cleaned = text.strip() if len(cleaned) <= limit: return cleaned return cleaned[: limit - 1] + "..."