import asyncio from contextlib import suppress from typing import Any from beaver.engine import AgentLoop, AgentRunResult, EngineLoader def _run_result(run_id: str, output_text: str) -> AgentRunResult: return AgentRunResult( session_id="web:test", run_id=run_id, output_text=output_text, finish_reason="stop", tool_iterations=0, ) def test_running_loop_handles_reentrant_submit_direct(tmp_path) -> None: async def run_case() -> None: loop = AgentLoop(loader=EngineLoader(workspace=tmp_path)) calls: list[str] = [] async def fake_process_direct(task: str, **kwargs: Any) -> AgentRunResult: calls.append(task) if task == "outer": return await loop.submit_direct("inner", session_id="web:test") return _run_result(task, "inner completed") loop._process_direct_impl = fake_process_direct # type: ignore[method-assign] loop_task = asyncio.create_task(loop.run()) await asyncio.sleep(0) try: result = await asyncio.wait_for(loop.submit_direct("outer", session_id="web:test"), timeout=1) finally: await loop.stop() with suppress(asyncio.TimeoutError): await asyncio.wait_for(loop_task, timeout=1) if not loop_task.done(): loop_task.cancel() with suppress(asyncio.CancelledError): await loop_task assert result.output_text == "inner completed" assert calls == ["outer", "inner"] asyncio.run(run_case())