md-first memory extraction framework for AI agents. Markdown is the single source of truth; SQLite holds state and LanceDB provides the rebuildable vector + BM25 + scalar index. The codebase follows a single-direction DDD layering (entrypoints -> service -> memory -> infra, with component / core / config cross-cutting) enforced by import-linter. Engineering surface: - Coding conventions in .claude/rules/ (path-scoped) and workflows in .claude/skills/ (/commit, /new-branch, /pr). - GitHub Actions CI runs make lint + test + integration; pre-commit mirrors the gates locally (ruff, hygiene hooks, gitlint commit-msg). - Commit messages follow Conventional Commits, enforced by gitlint. - make lint also enforces datetime two-zone discipline and OpenAPI drift.
136 lines
4.7 KiB
Python
136 lines
4.7 KiB
Python
"""One-shot dumper: extract a search-test seed from a corpus snapshot.
|
|
|
|
Reads the LanceDB tables under
|
|
``/tmp/everos_corpus_v2/.index/lancedb/`` (the snapshot produced by
|
|
``tests/e2e/test_add_flush_user_pipeline_e2e.py`` with ``EVEROS_KEEP_CORPUS_TO``
|
|
set), samples a small representative slice, and emits JSON fixtures
|
|
under ``tests/fixtures/search_seed/``.
|
|
|
|
Sampling rules:
|
|
|
|
- **episode**: first 8 rows per owner (caroline + melanie). Captures
|
|
the parent_id (= memcell_id) set so downstream tables can be
|
|
bridge-consistent.
|
|
- **atomic_fact**: every row whose ``parent_id`` is in the episode-
|
|
parent set above, capped at 50 to keep the seed compact. This
|
|
guarantees MRAG-fusion testing can verify "facts sharing a
|
|
memcell with the matched episode get embedded".
|
|
- **foresight**: 5 per owner. Archived for future use; current
|
|
``/search`` does not query foresight, so the seed only exists so
|
|
downstream tests can opt in without re-cutting the corpus.
|
|
- **user_profile**: 1 per owner (= 2 total).
|
|
|
|
Run::
|
|
|
|
python tests/fixtures/_dump_search_seed.py
|
|
|
|
Re-run any time the corpus changes; output JSON is committed to
|
|
git so other contributors don't need the corpus locally.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import lancedb
|
|
|
|
CORPUS = Path("/tmp/everos_corpus_v2/.index/lancedb")
|
|
OUT_DIR = Path(__file__).parent / "search_seed"
|
|
ALL_OWNERS = ("caroline", "melanie")
|
|
|
|
|
|
def _serialise(row: dict[str, Any]) -> dict[str, Any]:
|
|
"""Make a LanceDB row dict JSON-safe (numpy → list, datetime → ISO)."""
|
|
out: dict[str, Any] = {}
|
|
for k, v in row.items():
|
|
if v is None:
|
|
out[k] = None
|
|
elif hasattr(v, "tolist"): # numpy ndarray (vector)
|
|
out[k] = v.tolist()
|
|
elif isinstance(v, datetime):
|
|
out[k] = v.isoformat()
|
|
else:
|
|
out[k] = v
|
|
return out
|
|
|
|
|
|
def _read(db: lancedb.DBConnection, table: str) -> list[dict[str, Any]]:
|
|
if f"{table}.lance" not in {p.name for p in CORPUS.iterdir()}:
|
|
raise FileNotFoundError(f"corpus table missing: {table}")
|
|
return db.open_table(table).to_arrow().to_pylist()
|
|
|
|
|
|
def main() -> None:
|
|
if not CORPUS.exists():
|
|
print(f"corpus not found: {CORPUS}", file=sys.stderr)
|
|
print("hint: run the add+flush pipeline first with", file=sys.stderr)
|
|
print(" EVEROS_KEEP_CORPUS_TO=/tmp/everos_corpus_v2", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
db = lancedb.connect(str(CORPUS))
|
|
|
|
# 1) episodes — first 8 per owner.
|
|
eps_all = _read(db, "episode")
|
|
eps: list[dict[str, Any]] = []
|
|
parent_memcells: set[str] = set()
|
|
for owner in ALL_OWNERS:
|
|
owned = [r for r in eps_all if r["owner_id"] == owner][:8]
|
|
eps.extend(owned)
|
|
for r in owned:
|
|
parent_memcells.add(r["parent_id"])
|
|
|
|
# 2) atomic_facts — every fact whose parent_id is in the episode
|
|
# parent set, capped to keep the seed compact (and so MRAG
|
|
# ``facts_for_episodes`` has a useful but bounded pool to
|
|
# bucket back into episodes).
|
|
afs_all = _read(db, "atomic_fact")
|
|
# Atomic facts fan out per-owner (a single fact about a memcell that
|
|
# mentions two users gets two rows, one for each owner) — sampling
|
|
# naively can leave one owner with zero facts. Take per-owner caps
|
|
# so both caroline and melanie have facts whose parent_id matches
|
|
# their own episodes' parent_id (MRAG bridge).
|
|
afs: list[dict[str, Any]] = []
|
|
for owner in ALL_OWNERS:
|
|
afs.extend(
|
|
[
|
|
r
|
|
for r in afs_all
|
|
if r["owner_id"] == owner and r["parent_id"] in parent_memcells
|
|
][:10]
|
|
)
|
|
|
|
# 3) foresights — 5 per owner, archived for future use.
|
|
fss_all = _read(db, "foresight")
|
|
fss: list[dict[str, Any]] = []
|
|
for owner in ALL_OWNERS:
|
|
fss.extend([r for r in fss_all if r["owner_id"] == owner][:5])
|
|
|
|
# 4) user_profile — 1 per owner.
|
|
ups_all = _read(db, "user_profile")
|
|
ups = [r for r in ups_all if r["owner_id"] in ALL_OWNERS]
|
|
|
|
written: list[tuple[str, int, int]] = []
|
|
for name, rows in (
|
|
("episode", eps),
|
|
("atomic_fact", afs),
|
|
("foresight", fss),
|
|
("user_profile", ups),
|
|
):
|
|
serialised = [_serialise(r) for r in rows]
|
|
out = OUT_DIR / f"{name}.json"
|
|
out.write_text(json.dumps(serialised, indent=2, default=str))
|
|
written.append((name, len(serialised), out.stat().st_size))
|
|
|
|
for name, count, size in written:
|
|
print(f" {name:14s}: {count:3d} rows ({size // 1024} KB)")
|
|
print(f" parent_memcells captured: {len(parent_memcells)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|