md-first memory extraction framework for AI agents. Markdown is the single source of truth; SQLite holds state and LanceDB provides the rebuildable vector + BM25 + scalar index. The codebase follows a single-direction DDD layering (entrypoints -> service -> memory -> infra, with component / core / config cross-cutting) enforced by import-linter. Engineering surface: - Coding conventions in .claude/rules/ (path-scoped) and workflows in .claude/skills/ (/commit, /new-branch, /pr). - GitHub Actions CI runs make lint + test + integration; pre-commit mirrors the gates locally (ruff, hygiene hooks, gitlint commit-msg). - Commit messages follow Conventional Commits, enforced by gitlint. - make lint also enforces datetime two-zone discipline and OpenAPI drift.
317 lines
10 KiB
Python
317 lines
10 KiB
Python
"""Strict md <-> lancedb consistency across all 4 daily-log kinds.
|
|
|
|
For each registered daily-log kind, seed N entries via the kind's
|
|
writer, wait for the cascade to drain, then assert exact equality
|
|
between md state and LanceDB state:
|
|
|
|
* ``frontmatter.entry_count == N``
|
|
* number of ``<!-- entry:... -->`` blocks == N
|
|
* ``lance_repo.count_rows(md_path=...) == N``
|
|
* lance ``entry_id`` set == md ``entry_id`` set
|
|
|
|
This is the strict counterpart to the loose ``>=`` assertions in
|
|
:mod:`test_add_flush_user_pipeline_e2e` (which can't be exact because
|
|
LLM output is non-deterministic).
|
|
|
|
Skill / profile are single-file (not daily-log) kinds and are covered
|
|
by the e2e pipeline tests where the OME drives real LLM emissions.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import datetime as _dt
|
|
from collections.abc import AsyncIterator, Callable, Mapping
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from sqlmodel import SQLModel
|
|
|
|
from everos.component.embedding import EmbeddingProvider
|
|
from everos.component.tokenizer import build_tokenizer
|
|
from everos.core.persistence import MarkdownReader, MemoryRoot
|
|
from everos.infra.persistence.lancedb import (
|
|
agent_case_repo,
|
|
atomic_fact_repo,
|
|
dispose_connection,
|
|
ensure_business_indexes,
|
|
episode_repo,
|
|
foresight_repo,
|
|
)
|
|
from everos.infra.persistence.lancedb.lancedb_manager import get_table
|
|
from everos.infra.persistence.lancedb.tables.agent_case import AgentCase
|
|
from everos.infra.persistence.lancedb.tables.atomic_fact import AtomicFact
|
|
from everos.infra.persistence.lancedb.tables.episode import Episode
|
|
from everos.infra.persistence.lancedb.tables.foresight import Foresight
|
|
from everos.infra.persistence.markdown import (
|
|
AgentCaseWriter,
|
|
AtomicFactWriter,
|
|
EpisodeWriter,
|
|
ForesightWriter,
|
|
)
|
|
from everos.infra.persistence.sqlite import (
|
|
dispose_engine,
|
|
get_engine,
|
|
md_change_state_repo,
|
|
)
|
|
from everos.memory.cascade import CascadeConfig, CascadeOrchestrator
|
|
from everos.memory.cascade.registry import KIND_REGISTRY
|
|
from tests._consistency_assertions import _daily_log_sha_for_entry
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_lancedb_write_locks() -> None:
|
|
"""ClassVar lock pool reset; see test_repository.py for rationale."""
|
|
from everos.core.persistence.lancedb.repository import LanceRepoBase
|
|
|
|
LanceRepoBase._reset_locks_for_tests()
|
|
|
|
|
|
class _StubEmbedder(EmbeddingProvider):
|
|
dim = 1024
|
|
|
|
async def embed(self, text: str) -> list[float]:
|
|
return [0.0] * self.dim
|
|
|
|
async def embed_batch(self, texts): # type: ignore[no-untyped-def]
|
|
return [[0.0] * self.dim for _ in texts]
|
|
|
|
|
|
@pytest.fixture
|
|
async def cascade_runtime(
|
|
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
|
) -> AsyncIterator[MemoryRoot]:
|
|
monkeypatch.setenv("EVEROS_MEMORY__ROOT", str(tmp_path))
|
|
monkeypatch.setenv("EVEROS_EMBEDDING__MODEL", "stub-model")
|
|
monkeypatch.setenv("EVEROS_EMBEDDING__BASE_URL", "http://stub.invalid/v1")
|
|
monkeypatch.setenv("EVEROS_EMBEDDING__API_KEY", "stub-key")
|
|
await dispose_connection()
|
|
await dispose_engine()
|
|
engine = get_engine()
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(SQLModel.metadata.create_all)
|
|
await ensure_business_indexes()
|
|
yield MemoryRoot.default()
|
|
await dispose_connection()
|
|
await dispose_engine()
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class _DailyLogKindCase:
|
|
"""A single registered daily-log kind, packaged for parametrization."""
|
|
|
|
name: str
|
|
scope: str # "users" | "agents"
|
|
dir_name: str
|
|
file_prefix: str
|
|
writer_factory: Callable[[MemoryRoot], Any]
|
|
repo: Any
|
|
table_cls: type
|
|
build_item: Callable[[str, int], tuple[Mapping[str, object], Mapping[str, str]]]
|
|
|
|
|
|
def _af_item(scope_id: str, j: int):
|
|
return (
|
|
{
|
|
"owner_id": scope_id,
|
|
"session_id": f"s_{j}",
|
|
"timestamp": "2026-05-19T07:04:26+00:00",
|
|
"parent_id": f"mc_{j}",
|
|
"sender_ids": [scope_id],
|
|
},
|
|
{"Fact": f"af fact body {j}"},
|
|
)
|
|
|
|
|
|
def _ep_item(scope_id: str, j: int):
|
|
return (
|
|
{
|
|
"owner_id": scope_id,
|
|
"session_id": f"s_{j}",
|
|
"timestamp": "2026-05-19T07:04:26+00:00",
|
|
"parent_id": f"mc_{j}",
|
|
"sender_ids": [scope_id],
|
|
},
|
|
{"Subject": f"subj {j}", "Summary": f"sum {j}", "Content": f"content {j}"},
|
|
)
|
|
|
|
|
|
def _fs_item(scope_id: str, j: int):
|
|
return (
|
|
{
|
|
"owner_id": scope_id,
|
|
"session_id": f"s_{j}",
|
|
"timestamp": "2026-05-19T07:04:26+00:00",
|
|
"parent_id": f"mc_{j}",
|
|
"sender_ids": [scope_id],
|
|
},
|
|
{"Foresight": f"foresight body {j}"},
|
|
)
|
|
|
|
|
|
def _ac_item(scope_id: str, j: int):
|
|
return (
|
|
{
|
|
"owner_id": scope_id,
|
|
"session_id": f"s_{j}",
|
|
"timestamp": "2026-05-19T07:04:26+00:00",
|
|
"parent_id": f"mc_{j}",
|
|
"quality_score": 0.9,
|
|
},
|
|
{
|
|
"TaskIntent": f"task intent {j}",
|
|
"Approach": f"approach {j}",
|
|
"KeyInsight": f"insight {j}",
|
|
},
|
|
)
|
|
|
|
|
|
_KIND_CASES: list[_DailyLogKindCase] = [
|
|
_DailyLogKindCase(
|
|
name="atomic_fact",
|
|
scope="users",
|
|
dir_name=".atomic_facts",
|
|
file_prefix="atomic_fact",
|
|
writer_factory=AtomicFactWriter,
|
|
repo=atomic_fact_repo,
|
|
table_cls=AtomicFact,
|
|
build_item=_af_item,
|
|
),
|
|
_DailyLogKindCase(
|
|
name="episode",
|
|
scope="users",
|
|
dir_name="episodes",
|
|
file_prefix="episode",
|
|
writer_factory=EpisodeWriter,
|
|
repo=episode_repo,
|
|
table_cls=Episode,
|
|
build_item=_ep_item,
|
|
),
|
|
_DailyLogKindCase(
|
|
name="foresight",
|
|
scope="users",
|
|
dir_name=".foresights",
|
|
file_prefix="foresight",
|
|
writer_factory=ForesightWriter,
|
|
repo=foresight_repo,
|
|
table_cls=Foresight,
|
|
build_item=_fs_item,
|
|
),
|
|
_DailyLogKindCase(
|
|
name="agent_case",
|
|
scope="agents",
|
|
dir_name=".cases",
|
|
file_prefix="agent_case",
|
|
writer_factory=AgentCaseWriter,
|
|
repo=agent_case_repo,
|
|
table_cls=AgentCase,
|
|
build_item=_ac_item,
|
|
),
|
|
]
|
|
|
|
|
|
async def _wait_path_done(md_path: str, *, deadline: float = 15.0) -> None:
|
|
async with asyncio.timeout(deadline):
|
|
while True: # noqa: ASYNC110 - polling cascade state
|
|
row = await md_change_state_repo.get_by_id(md_path)
|
|
if row is not None:
|
|
break
|
|
await asyncio.sleep(0.05)
|
|
while True: # noqa: ASYNC110 - polling cascade state
|
|
row = await md_change_state_repo.get_by_id(md_path)
|
|
if row is not None and row.status in ("done", "failed"):
|
|
break
|
|
await asyncio.sleep(0.05)
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
@pytest.mark.parametrize("case", _KIND_CASES, ids=lambda c: c.name)
|
|
async def test_md_lance_strict_consistency_per_kind(
|
|
cascade_runtime: MemoryRoot,
|
|
case: _DailyLogKindCase,
|
|
) -> None:
|
|
"""Per-kind strict equality: md entries / frontmatter / lance rows all == N."""
|
|
memory_root = cascade_runtime
|
|
orchestrator = CascadeOrchestrator(
|
|
memory_root=memory_root,
|
|
embedder=_StubEmbedder(),
|
|
tokenizer=build_tokenizer(),
|
|
config=CascadeConfig(
|
|
scan_interval_seconds=60.0,
|
|
worker_batch_size=20,
|
|
worker_max_retry=1,
|
|
worker_poll_interval_seconds=0.05,
|
|
worker_retry_backoff_seconds=0.0,
|
|
),
|
|
)
|
|
await orchestrator.start()
|
|
await asyncio.sleep(0.3)
|
|
|
|
try:
|
|
writer = case.writer_factory(root=memory_root)
|
|
scope_id = f"sid_{case.name}"
|
|
bucket = _dt.date(2026, 5, 19)
|
|
n = 5
|
|
items = [case.build_item(scope_id, j) for j in range(n)]
|
|
eids = await writer.append_entries(scope_id, items, date=bucket)
|
|
assert len(eids) == n, f"writer returned {len(eids)} eids, expected {n}"
|
|
|
|
md_path = (
|
|
f"default_app/default_project/{case.scope}/{scope_id}/{case.dir_name}/"
|
|
f"{case.file_prefix}-{bucket.isoformat()}.md"
|
|
)
|
|
absolute = memory_root.root / md_path
|
|
await _wait_path_done(md_path)
|
|
|
|
# 1) frontmatter.entry_count == N
|
|
parsed = await MarkdownReader.read(absolute)
|
|
assert parsed.frontmatter.get("entry_count") == n, (
|
|
f"{case.name}: frontmatter.entry_count="
|
|
f"{parsed.frontmatter.get('entry_count')}, expected {n}"
|
|
)
|
|
|
|
# 2) md entry blocks == N
|
|
assert len(parsed.entries) == n, (
|
|
f"{case.name}: md has {len(parsed.entries)} entry blocks, expected {n}"
|
|
)
|
|
|
|
# 3) lance count_rows(md_path) == N (strict equality)
|
|
table = await get_table(case.table_cls.TABLE_NAME, case.table_cls)
|
|
lance_count = await table.count_rows(filter=f"md_path = '{md_path}'")
|
|
assert lance_count == n, (
|
|
f"{case.name}: md={n} lance={lance_count} for {md_path}"
|
|
)
|
|
|
|
# 4) lance entry_id set == md entry_id set
|
|
lance_rows = await case.repo.find_where(f"md_path = '{md_path}'", limit=100)
|
|
lance_eids = {r.entry_id for r in lance_rows}
|
|
md_eids = {e.id for e in parsed.entries}
|
|
assert lance_eids == md_eids, (
|
|
f"{case.name}: lance eids {lance_eids} != md eids {md_eids}"
|
|
)
|
|
|
|
# 4b) lance content_sha256 per entry == md-recomputed content_sha256
|
|
# Catches "id present but content mismatched" — orthogonal to (4).
|
|
handler_cls = next(
|
|
spec.handler_factory for spec in KIND_REGISTRY if spec.name == case.name
|
|
)
|
|
md_sha_by_id = {
|
|
e.id: _daily_log_sha_for_entry(handler_cls, e.as_structured())
|
|
for e in parsed.entries
|
|
}
|
|
lance_sha_by_id = {r.entry_id: r.content_sha256 for r in lance_rows}
|
|
assert md_sha_by_id == lance_sha_by_id, (
|
|
f"{case.name}: per-entry content_sha256 mismatch "
|
|
f"@ {md_path}: md={md_sha_by_id} lance={lance_sha_by_id}"
|
|
)
|
|
|
|
# 5) row state row is terminally done (not failed)
|
|
state_row = await md_change_state_repo.get_by_id(md_path)
|
|
assert state_row is not None and state_row.status == "done", (
|
|
f"{case.name}: state row status={state_row.status if state_row else 'NONE'}"
|
|
)
|
|
finally:
|
|
await orchestrator.stop()
|