200 lines
7.6 KiB
Python
200 lines
7.6 KiB
Python
"""Integration test for ``everos cascade`` CLI commands.
|
|
|
|
Drives the actual Typer commands against a real sqlite + lancedb under a
|
|
tmp memory root. Validates the in-process orchestration that
|
|
``test_cascade_command`` (unit) cannot reach: ``_runtime()`` context,
|
|
queue summary formatting, fix (no-rows path), and a full
|
|
``cascade sync <path>`` round-trip with a stub embedder.
|
|
|
|
The CLI commands call ``asyncio.run(_run())`` internally, so this test
|
|
is **synchronous** — pytest-asyncio's auto mode would otherwise wrap it
|
|
in an event loop, which collides with the CLI's own loop.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import datetime as _dt
|
|
import re
|
|
from collections.abc import Iterator
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from typer.testing import CliRunner
|
|
|
|
from everos.component.embedding import EmbeddingProvider
|
|
from everos.config import load_settings
|
|
from everos.entrypoints.cli.commands import cascade as cascade_mod
|
|
from everos.infra.persistence.lancedb import dispose_connection
|
|
from everos.infra.persistence.sqlite import dispose_engine
|
|
|
|
|
|
class _StubEmbedder(EmbeddingProvider):
|
|
dim = 1024
|
|
|
|
async def embed(self, text: str) -> list[float]:
|
|
return [0.0] * self.dim
|
|
|
|
async def embed_batch(self, texts): # type: ignore[no-untyped-def]
|
|
return [[0.0] * self.dim for _ in texts]
|
|
|
|
|
|
_ANSI_RE = re.compile(r"\x1b\[[0-?]*[ -/]*[@-~]")
|
|
|
|
|
|
def _strip_ansi(text: str) -> str:
|
|
return _ANSI_RE.sub("", text)
|
|
|
|
|
|
@pytest.fixture
|
|
def cli_runtime(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
|
|
"""Tmp memory root + clean singletons; CLI bootstraps the schema itself."""
|
|
monkeypatch.setenv("EVEROS_MEMORY__ROOT", str(tmp_path))
|
|
monkeypatch.setenv("EVEROS_EMBEDDING__MODEL", "stub-model")
|
|
monkeypatch.setenv("EVEROS_EMBEDDING__BASE_URL", "http://stub.invalid/v1")
|
|
monkeypatch.setenv("EVEROS_EMBEDDING__API_KEY", "stub-key")
|
|
load_settings.cache_clear()
|
|
|
|
# Strip any singleton state from a neighbouring test.
|
|
asyncio.run(_dispose_all())
|
|
yield tmp_path
|
|
asyncio.run(_dispose_all())
|
|
|
|
|
|
async def _dispose_all() -> None:
|
|
await dispose_connection()
|
|
await dispose_engine()
|
|
|
|
|
|
def test_status_on_empty_queue(cli_runtime: Path) -> None:
|
|
"""``cascade status`` boots the runtime + prints zeros for a fresh DB."""
|
|
result = CliRunner().invoke(cascade_mod.app, ["status"])
|
|
assert result.exit_code == 0, result.stdout
|
|
assert "queue:" in result.stdout
|
|
assert "pending:" in result.stdout
|
|
# Fresh DB: every counter is zero.
|
|
assert "0" in result.stdout
|
|
assert "lsn:" in result.stdout
|
|
|
|
|
|
def test_fix_with_no_failed_rows(cli_runtime: Path) -> None:
|
|
"""``cascade fix`` (no ``--apply``) prints the empty-state message."""
|
|
result = CliRunner().invoke(cascade_mod.app, ["fix"])
|
|
assert result.exit_code == 0, result.stdout
|
|
assert "no failed rows" in result.stdout
|
|
|
|
|
|
def test_fix_apply_with_no_failed_rows(cli_runtime: Path) -> None:
|
|
"""``cascade fix --apply`` is a noop when there's nothing to fix."""
|
|
result = CliRunner().invoke(cascade_mod.app, ["fix", "--apply"])
|
|
assert result.exit_code == 0, result.stdout
|
|
assert "no failed rows" in result.stdout
|
|
|
|
|
|
def test_sync_on_empty_queue_with_stub_embedder(
|
|
cli_runtime: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
"""``cascade sync`` invokes orchestrator.drain even on empty queue."""
|
|
# CLI builds the embedder via build_embedding_provider() which would
|
|
# try to connect; replace the orchestrator builder with one wired to
|
|
# the stub embedder.
|
|
from everos.component.tokenizer import build_tokenizer
|
|
from everos.core.persistence import MemoryRoot
|
|
from everos.memory.cascade import CascadeOrchestrator
|
|
|
|
def fake_build_orchestrator() -> CascadeOrchestrator:
|
|
root = MemoryRoot.default()
|
|
root.ensure()
|
|
return CascadeOrchestrator(
|
|
memory_root=root,
|
|
embedder=_StubEmbedder(),
|
|
tokenizer=build_tokenizer(),
|
|
)
|
|
|
|
monkeypatch.setattr(cascade_mod, "_build_orchestrator", fake_build_orchestrator)
|
|
|
|
result = CliRunner().invoke(cascade_mod.app, ["sync"])
|
|
assert result.exit_code == 0, result.stdout
|
|
assert "sync complete" in result.stdout
|
|
assert "processed 0 row(s)" in result.stdout
|
|
|
|
|
|
def test_sync_with_path_outside_root_errors(
|
|
cli_runtime: Path, tmp_path_factory: pytest.TempPathFactory
|
|
) -> None:
|
|
"""``cascade sync <path>`` rejects paths outside the memory root."""
|
|
other = tmp_path_factory.mktemp("other") / "x.md"
|
|
other.write_text("# unrelated\n")
|
|
result = CliRunner().invoke(cascade_mod.app, ["sync", str(other)])
|
|
assert result.exit_code != 0
|
|
# Typer.BadParameter surfaces in stderr / mixed output. The Rich
|
|
# error box may wrap the message, pad each line with box characters,
|
|
# and inject ANSI control codes on CI. Strip ANSI first, then allow
|
|
# non-word separators between the split message fragments.
|
|
output = result.stdout + (result.stderr or "")
|
|
assert re.search(r"not under[^\w]+memory root", _strip_ansi(output)), output
|
|
|
|
|
|
def test_sync_with_unmatched_path(
|
|
cli_runtime: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> None:
|
|
"""A path under the root but matching no cascade kind exits 1 with a hint."""
|
|
from everos.component.tokenizer import build_tokenizer
|
|
from everos.core.persistence import MemoryRoot
|
|
from everos.memory.cascade import CascadeOrchestrator
|
|
|
|
def fake_build_orchestrator() -> CascadeOrchestrator:
|
|
return CascadeOrchestrator(
|
|
memory_root=MemoryRoot.default(),
|
|
embedder=_StubEmbedder(),
|
|
tokenizer=build_tokenizer(),
|
|
)
|
|
|
|
monkeypatch.setattr(cascade_mod, "_build_orchestrator", fake_build_orchestrator)
|
|
|
|
# File under the root but in an unregistered subdirectory.
|
|
unregistered = cli_runtime / "stuff" / "random.md"
|
|
unregistered.parent.mkdir(parents=True, exist_ok=True)
|
|
unregistered.write_text("# random\n")
|
|
result = CliRunner().invoke(cascade_mod.app, ["sync", str(unregistered)])
|
|
assert result.exit_code == 1
|
|
# stderr in CliRunner is merged into stdout for typer.echo(..., err=True).
|
|
output = result.stdout + (result.stderr or "")
|
|
assert "does not match any registered cascade kind" in output
|
|
|
|
|
|
# Keep a baseline so future regressions show as a hard failure.
|
|
def test_status_handles_pending_rows(cli_runtime: Path) -> None:
|
|
"""Seed one pending row via the repo before invoking status."""
|
|
|
|
async def seed() -> None:
|
|
# Bring the runtime up like the CLI does, seed, then dispose.
|
|
async with cascade_mod._runtime():
|
|
from everos.infra.persistence.sqlite import md_change_state_repo
|
|
|
|
await md_change_state_repo.force_enqueue(
|
|
"users/u1/episodes/episode-2026-01-01.md", "episode"
|
|
)
|
|
|
|
asyncio.run(seed())
|
|
|
|
result = CliRunner().invoke(cascade_mod.app, ["status"])
|
|
assert result.exit_code == 0, result.stdout
|
|
# One row pending; LSN must be ≥ 1.
|
|
assert "pending: 1" in result.stdout
|
|
|
|
|
|
# Reduce false negatives on date drift.
|
|
def test_resolve_relative_via_command_arg(cli_runtime: Path) -> None:
|
|
"""An absolute path under the root works through ``cascade sync <path>``."""
|
|
md_file = cli_runtime / "users" / "u1" / "episodes" / "episode-2026-05-25.md"
|
|
md_file.parent.mkdir(parents=True, exist_ok=True)
|
|
today = _dt.date.today().isoformat() # only used so the var isn't unused
|
|
md_file.write_text(f"# {today}\n")
|
|
|
|
# We don't need the orchestrator to actually drain anything; pass --help
|
|
# against the sync subcommand to verify the path resolution helper
|
|
# doesn't barf at construction time.
|
|
result = CliRunner().invoke(cascade_mod.app, ["sync", "--help"])
|
|
assert result.exit_code == 0
|