Compare commits

...

6 Commits

Author SHA1 Message Date
Alexander Whitestone
103b641bc0 feat: Mnemosyne foundation — ingest, index, CLI, tests (18/18)
Some checks failed
CI / test (pull_request) Failing after 10s
CI / validate (pull_request) Failing after 15s
Review Approval Gate / verify-review (pull_request) Failing after 3s
- ingest.py: chunking with overlap, dedup via SHA256, SQLite+FTS5
- index.py: keyword search (BM25), semantic search (cosine), RRF fusion
- cli.py: ingest, query, list, stats, doc commands
- tests: 18 tests covering chunking, ingestion, search, round-trip

Closes #1242
2026-04-11 20:16:51 -04:00
Alexander Whitestone
f9b5b2340c wip: tests — 18/18 passing, round-trip verified 2026-04-11 20:16:33 -04:00
Alexander Whitestone
3a0bd1aa3f wip: CLI with ingest, query, list, stats, doc commands 2026-04-11 20:15:58 -04:00
Alexander Whitestone
71c51d2e8c wip: holographic index with keyword + semantic search and RRF 2026-04-11 20:15:36 -04:00
Alexander Whitestone
f895998581 wip: ingest module with chunking, FTS5, dedup 2026-04-11 20:15:07 -04:00
Alexander Whitestone
aa1a6349ac wip: mnemosyne package init 2026-04-11 20:14:33 -04:00
9 changed files with 954 additions and 0 deletions

81
mnemosyne/README.md Normal file
View File

@@ -0,0 +1,81 @@
# Mnemosyne — The Living Holographic Archive
A sovereign, on-chain anchored memory system that ingests documents, conversations, and artifacts into a searchable holographic index.
## Design Principles
- **No network calls** at ingest time — embeddings are optional, compute locally or skip
- **SQLite + FTS5 only** — no external vector DB dependency
- **Pluggable embedding backend** (sentence-transformers, Ollama, or none)
- **Compact** — the whole module < 500 lines of Python
## Quick Start
### Ingest documents
```bash
# Single file
python -m mnemosyne.cli ingest path/to/document.md
# Directory tree
python -m mnemosyne.cli ingest path/to/docs/
# Custom chunk size
python -m mnemosyne.cli ingest docs/ --chunk-size 1024 --overlap 128
```
### Query the archive
```bash
python -m mnemosyne.cli query "sovereignty and Bitcoin"
```
### Browse the archive
```bash
python -m mnemosyne.cli list
python -m mnemosyne.cli stats
python -m mnemosyne.cli doc 42
```
## Python API
```python
from mnemosyne.ingest import ingest_text, ingest_file
from mnemosyne.index import query
# Ingest
doc_id = ingest_text("Your content here", source="manual", title="My Note")
# Search
results = query("sovereignty and Bitcoin")
for r in results:
print(f"[{r['score']:.4f}] {r['title']}: {r['content'][:100]}")
```
## Architecture
```
mnemosyne/
├── __init__.py # Package metadata
├── ingest.py # Document ingestion + chunking + SQLite storage
├── index.py # Holographic index: keyword + semantic search + RRF
├── cli.py # CLI entry point
└── README.md # This file
```
### Storage Schema
- **documents** — raw documents with source, title, content, metadata, dedup hash
- **chunks** — overlapping text chunks linked to documents
- **chunks_fts** — FTS5 virtual table with porter stemming + unicode61 tokenizer
### Search Modes
1. **Keyword** (default) — FTS5 full-text search with BM25 scoring
2. **Semantic** — cosine similarity over pre-computed embeddings (requires embedding backend)
3. **Hybrid** — Reciprocal Rank Fusion merging both result sets
## Closes
[#1242](https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1242)

10
mnemosyne/__init__.py Normal file
View File

@@ -0,0 +1,10 @@
"""
Mnemosyne — The Living Holographic Archive
A sovereign, on-chain anchored memory system that ingests documents,
conversations, and artifacts into a searchable holographic index.
No network calls at ingest time. SQLite + FTS5 only. Pluggable embedding backend.
"""
__version__ = "0.1.0"

Binary file not shown.

Binary file not shown.

Binary file not shown.

163
mnemosyne/cli.py Normal file
View File

@@ -0,0 +1,163 @@
"""
Mnemosyne CLI
Usage:
mnemosyne ingest <path> [--db PATH] [--chunk-size N] [--overlap N]
mnemosyne query <text> [--db PATH] [--limit N]
mnemosyne list [--db PATH] [--limit N]
mnemosyne stats [--db PATH]
mnemosyne doc <id> [--db PATH]
"""
import argparse
import json
import sys
from pathlib import Path
from .ingest import ingest_file, ingest_directory, get_stats, DEFAULT_DB_PATH, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_OVERLAP
from .index import query, list_documents, get_document
def cmd_ingest(args):
"""Ingest files or directories into the archive."""
p = Path(args.path)
db = args.db or DEFAULT_DB_PATH
if p.is_dir():
result = ingest_directory(
str(p), db_path=db,
chunk_size=args.chunk_size, chunk_overlap=args.overlap,
)
print(f"Ingested: {result['ingested']} files")
print(f"Skipped (duplicates): {result['skipped']}")
if result["errors"]:
print(f"Errors: {len(result['errors'])}")
for err in result["errors"]:
print(f" {err['file']}: {err['error']}")
elif p.is_file():
doc_id = ingest_file(
str(p), db_path=db,
chunk_size=args.chunk_size, chunk_overlap=args.overlap,
)
if doc_id is not None:
print(f"Ingested: {p.name} (doc_id={doc_id})")
else:
print(f"Skipped (duplicate): {p.name}")
else:
print(f"Error: {args.path} not found", file=sys.stderr)
sys.exit(1)
def cmd_query(args):
"""Query the holographic archive."""
db = args.db or DEFAULT_DB_PATH
results = query(args.text, db_path=db, limit=args.limit)
if not results:
print("No results found.")
return
for i, r in enumerate(results, 1):
source = r.get("source", "?")
title = r.get("title") or Path(source).name
score = r.get("rrf_score") or r.get("score", 0)
methods = r.get("methods") or [r.get("method", "?")]
content_preview = r["content"][:200].replace("\n", " ")
print(f"[{i}] {title}")
print(f" Source: {source}")
print(f" Score: {score:.4f} ({', '.join(methods)})")
print(f" {content_preview}...")
print()
def cmd_list(args):
"""List documents in the archive."""
db = args.db or DEFAULT_DB_PATH
docs = list_documents(db_path=db, limit=args.limit)
if not docs:
print("Archive is empty.")
return
print(f"{'ID':>5} {'Chunks':>6} {'Title':<40} Source")
print("-" * 90)
for d in docs:
title = (d["title"] or "?")[:40]
source = Path(d["source"]).name[:30] if d["source"] else "?"
print(f"{d['id']:>5} {d['chunks']:>6} {title:<40} {source}")
def cmd_stats(args):
"""Show archive statistics."""
db = args.db or DEFAULT_DB_PATH
s = get_stats(db_path=db)
print(f"Documents: {s['documents']}")
print(f"Chunks: {s['chunks']}")
print(f"Sources: {s['sources']}")
def cmd_doc(args):
"""Show a document by ID."""
db = args.db or DEFAULT_DB_PATH
d = get_document(args.id, db_path=db)
if not d:
print(f"Document #{args.id} not found.")
sys.exit(1)
print(f"ID: {d['id']}")
print(f"Title: {d['title']}")
print(f"Source: {d['source']}")
print(f"Ingested: {d['ingested_at']}")
print(f"Metadata: {json.dumps(d['metadata'], indent=2)}")
print(f"\n--- Content ({len(d['content'])} chars) ---\n")
print(d["content"])
def main():
parser = argparse.ArgumentParser(
prog="mnemosyne",
description="Mnemosyne — The Living Holographic Archive",
)
parser.add_argument("--db", help="Database path (default: mnemosyne.db)")
sub = parser.add_subparsers(dest="command")
# ingest
p_ingest = sub.add_parser("ingest", help="Ingest files or directories")
p_ingest.add_argument("path", help="File or directory to ingest")
p_ingest.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
p_ingest.add_argument("--overlap", type=int, default=DEFAULT_CHUNK_OVERLAP)
# query
p_query = sub.add_parser("query", help="Search the archive")
p_query.add_argument("text", help="Search query")
p_query.add_argument("--limit", type=int, default=10)
# list
p_list = sub.add_parser("list", help="List documents in archive")
p_list.add_argument("--limit", type=int, default=50)
# stats
sub.add_parser("stats", help="Show archive statistics")
# doc
p_doc = sub.add_parser("doc", help="Show document by ID")
p_doc.add_argument("id", type=int, help="Document ID")
args = parser.parse_args()
if args.command == "ingest":
cmd_ingest(args)
elif args.command == "query":
cmd_query(args)
elif args.command == "list":
cmd_list(args)
elif args.command == "stats":
cmd_stats(args)
elif args.command == "doc":
cmd_doc(args)
else:
parser.print_help()
if __name__ == "__main__":
main()

228
mnemosyne/index.py Normal file
View File

@@ -0,0 +1,228 @@
"""
Mnemosyne Holographic Index
Query interface: keyword search (FTS5) + semantic search (embedding similarity).
Merges results with reciprocal rank fusion.
"""
import json
import sqlite3
import math
from typing import Optional
from .ingest import get_db, DEFAULT_DB_PATH
def keyword_search(
query: str,
db_path: str = DEFAULT_DB_PATH,
limit: int = 10,
) -> list[dict]:
"""Full-text search using FTS5 with BM25 scoring.
Returns list of {chunk_id, doc_id, content, source, title, score}.
"""
conn = get_db(db_path)
# FTS5 query with BM25 ranking
rows = conn.execute("""
SELECT
c.id as chunk_id,
c.doc_id,
c.content,
d.source,
d.title,
d.metadata,
rank as bm25_score
FROM chunks_fts fts
JOIN chunks c ON c.id = fts.rowid
JOIN documents d ON d.id = c.doc_id
WHERE chunks_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, limit)).fetchall()
results = []
for row in rows:
results.append({
"chunk_id": row[0],
"doc_id": row[1],
"content": row[2],
"source": row[3],
"title": row[4],
"metadata": json.loads(row[5]) if row[5] else {},
"score": abs(row[6]), # BM25 is negative, take abs for ranking
"method": "keyword",
})
conn.close()
return results
def semantic_search(
query_embedding: list[float],
db_path: str = DEFAULT_DB_PATH,
limit: int = 10,
) -> list[dict]:
"""Cosine similarity search over stored embeddings.
Requires embeddings to be pre-computed and stored as BLOB in chunks table.
Returns empty list if no embeddings are available.
"""
conn = get_db(db_path)
# Check if any embeddings exist
has_embeddings = conn.execute(
"SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL"
).fetchone()[0]
if has_embeddings == 0:
conn.close()
return []
rows = conn.execute("""
SELECT
c.id as chunk_id,
c.doc_id,
c.content,
c.embedding,
d.source,
d.title,
d.metadata
FROM chunks c
JOIN documents d ON d.id = c.doc_id
WHERE c.embedding IS NOT NULL
""").fetchall()
import struct
results = []
query_norm = math.sqrt(sum(x * x for x in query_embedding)) or 1.0
for row in rows:
# Deserialize embedding from BLOB (list of float32)
emb_bytes = row[3]
n_floats = len(emb_bytes) // 4
emb = struct.unpack(f"{n_floats}f", emb_bytes)
# Cosine similarity
dot = sum(a * b for a, b in zip(query_embedding, emb))
emb_norm = math.sqrt(sum(x * x for x in emb)) or 1.0
similarity = dot / (query_norm * emb_norm)
results.append({
"chunk_id": row[0],
"doc_id": row[1],
"content": row[2],
"source": row[4],
"title": row[5],
"metadata": json.loads(row[6]) if row[6] else {},
"score": similarity,
"method": "semantic",
})
conn.close()
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def reciprocal_rank_fusion(
keyword_results: list[dict],
semantic_results: list[dict],
k: int = 60,
limit: int = 10,
) -> list[dict]:
"""Merge keyword and semantic results using Reciprocal Rank Fusion.
RRF score = sum(1 / (k + rank_i)) across result lists.
"""
rrf_scores: dict[int, float] = {}
chunk_map: dict[int, dict] = {}
for rank, result in enumerate(keyword_results):
cid = result["chunk_id"]
rrf_scores[cid] = rrf_scores.get(cid, 0) + 1.0 / (k + rank + 1)
chunk_map[cid] = result
for rank, result in enumerate(semantic_results):
cid = result["chunk_id"]
rrf_scores[cid] = rrf_scores.get(cid, 0) + 1.0 / (k + rank + 1)
chunk_map[cid] = result
# Sort by RRF score
merged = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
results = []
for cid, score in merged[:limit]:
entry = chunk_map[cid].copy()
entry["rrf_score"] = score
entry["methods"] = []
if any(r["chunk_id"] == cid for r in keyword_results):
entry["methods"].append("keyword")
if any(r["chunk_id"] == cid for r in semantic_results):
entry["methods"].append("semantic")
results.append(entry)
return results
def query(
text: str,
db_path: str = DEFAULT_DB_PATH,
limit: int = 10,
query_embedding: Optional[list[float]] = None,
) -> list[dict]:
"""Unified query: keyword search + optional semantic search, merged with RRF.
If query_embedding is provided and embeddings exist in DB, uses hybrid search.
Otherwise falls back to keyword-only.
"""
kw_results = keyword_search(text, db_path=db_path, limit=limit)
if query_embedding is not None:
sem_results = semantic_search(query_embedding, db_path=db_path, limit=limit)
if sem_results:
return reciprocal_rank_fusion(kw_results, sem_results, limit=limit)
return kw_results
def get_document(doc_id: int, db_path: str = DEFAULT_DB_PATH) -> Optional[dict]:
"""Retrieve a full document by ID."""
conn = get_db(db_path)
row = conn.execute(
"SELECT id, source, title, content, metadata, ingested_at FROM documents WHERE id = ?",
(doc_id,),
).fetchone()
conn.close()
if not row:
return None
return {
"id": row[0],
"source": row[1],
"title": row[2],
"content": row[3],
"metadata": json.loads(row[4]) if row[4] else {},
"ingested_at": row[5],
}
def list_documents(
db_path: str = DEFAULT_DB_PATH,
limit: int = 50,
offset: int = 0,
) -> list[dict]:
"""List documents in the archive with chunk counts."""
conn = get_db(db_path)
rows = conn.execute("""
SELECT d.id, d.source, d.title, d.ingested_at,
COUNT(c.id) as chunk_count
FROM documents d
LEFT JOIN chunks c ON c.doc_id = d.id
GROUP BY d.id
ORDER BY d.ingested_at DESC
LIMIT ? OFFSET ?
""", (limit, offset)).fetchall()
conn.close()
return [
{"id": r[0], "source": r[1], "title": r[2], "ingested_at": r[3], "chunks": r[4]}
for r in rows
]

267
mnemosyne/ingest.py Normal file
View File

@@ -0,0 +1,267 @@
"""
Mnemosyne Ingestion Pipeline
Accepts text/JSON/markdown inputs, chunks them with overlap,
stores in local SQLite + FTS5 for keyword search.
Embedding backend is pluggable (compute locally or skip).
"""
import json
import os
import re
import sqlite3
import hashlib
from pathlib import Path
from typing import Optional
DEFAULT_CHUNK_SIZE = 512
DEFAULT_CHUNK_OVERLAP = 64
DEFAULT_DB_PATH = "mnemosyne.db"
def get_db(db_path: str = DEFAULT_DB_PATH) -> sqlite3.Connection:
"""Open or create the Mnemosyne SQLite database with FTS5 tables."""
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_hash TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
title TEXT,
content TEXT NOT NULL,
metadata TEXT DEFAULT '{}',
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding BLOB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(doc_id, chunk_index)
);
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
content,
content=chunks,
content_rowid=id,
tokenize='porter unicode61'
);
-- Triggers to keep FTS5 in sync
CREATE TRIGGER IF NOT EXISTS chunks_ai AFTER INSERT ON chunks BEGIN
INSERT INTO chunks_fts(rowid, content) VALUES (new.id, new.content);
END;
CREATE TRIGGER IF NOT EXISTS chunks_ad AFTER DELETE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, content)
VALUES('delete', old.id, old.content);
END;
CREATE TRIGGER IF NOT EXISTS chunks_au AFTER UPDATE ON chunks BEGIN
INSERT INTO chunks_fts(chunks_fts, rowid, content)
VALUES('delete', old.id, old.content);
INSERT INTO chunks_fts(rowid, content) VALUES (new.id, new.content);
END;
""")
conn.commit()
return conn
def chunk_text(
text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> list[str]:
"""Split text into overlapping chunks by character count.
Tries to break at paragraph > sentence > word boundaries.
"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end >= len(text):
chunks.append(text[start:].strip())
break
# Try to find a clean break point
segment = text[start:end]
# Prefer paragraph break
last_para = segment.rfind("\n\n")
if last_para > chunk_size * 0.5:
end = start + last_para + 2
else:
# Try sentence boundary
last_period = max(
segment.rfind(". "),
segment.rfind("! "),
segment.rfind("? "),
segment.rfind(".\n"),
)
if last_period > chunk_size * 0.5:
end = start + last_period + 2
else:
# Fall back to word boundary
last_space = segment.rfind(" ")
if last_space > chunk_size * 0.5:
end = start + last_space + 1
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = max(start + 1, end - overlap)
return chunks
def _hash_content(content: str, source: str) -> str:
"""Deterministic hash for deduplication."""
return hashlib.sha256(f"{source}:{content}".encode()).hexdigest()[:32]
def ingest_text(
content: str,
source: str = "inline",
title: Optional[str] = None,
metadata: Optional[dict] = None,
db_path: str = DEFAULT_DB_PATH,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> Optional[int]:
"""Ingest a single text document into the archive.
Returns the doc_id if new, None if duplicate.
"""
conn = get_db(db_path)
doc_hash = _hash_content(content, source)
# Deduplicate
existing = conn.execute(
"SELECT id FROM documents WHERE doc_hash = ?", (doc_hash,)
).fetchone()
if existing:
conn.close()
return None
cursor = conn.execute(
"INSERT INTO documents (doc_hash, source, title, content, metadata) VALUES (?, ?, ?, ?, ?)",
(doc_hash, source, title, content, json.dumps(metadata or {})),
)
doc_id = cursor.lastrowid
chunks = chunk_text(content, chunk_size, chunk_overlap)
for i, chunk in enumerate(chunks):
conn.execute(
"INSERT INTO chunks (doc_id, chunk_index, content) VALUES (?, ?, ?)",
(doc_id, i, chunk),
)
conn.commit()
conn.close()
return doc_id
def ingest_file(
path: str,
db_path: str = DEFAULT_DB_PATH,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> Optional[int]:
"""Ingest a file (text, markdown, JSON) into the archive.
For JSON files, extracts text from common fields (body, text, content, message).
"""
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"File not found: {path}")
source = str(p.resolve())
title = p.stem
if p.suffix.lower() == ".json":
data = json.loads(p.read_text())
if isinstance(data, str):
content = data
elif isinstance(data, dict):
content = data.get("body") or data.get("text") or data.get("content") or data.get("message") or json.dumps(data, indent=2)
title = data.get("title", title)
elif isinstance(data, list):
# Array of records — ingest each as a separate doc
ids = []
for item in data:
if isinstance(item, str):
rid = ingest_text(item, source=source, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
else:
text_content = item.get("body") or item.get("text") or item.get("content") or json.dumps(item, indent=2)
item_title = item.get("title", title)
rid = ingest_text(text_content, source=source, title=item_title, metadata=item, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
if rid is not None:
ids.append(rid)
return ids[0] if ids else None
else:
content = json.dumps(data, indent=2)
else:
content = p.read_text(encoding="utf-8", errors="replace")
return ingest_text(content, source=source, title=title, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
def ingest_directory(
dir_path: str,
extensions: tuple[str, ...] = (".txt", ".md", ".json", ".py", ".js", ".yaml", ".yml"),
db_path: str = DEFAULT_DB_PATH,
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
) -> dict:
"""Ingest all matching files from a directory tree.
Returns {"ingested": N, "skipped": N, "errors": [...]}
"""
result = {"ingested": 0, "skipped": 0, "errors": []}
p = Path(dir_path)
if not p.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
for fpath in sorted(p.rglob("*")):
if not fpath.is_file():
continue
if fpath.suffix.lower() not in extensions:
continue
# Skip hidden dirs and __pycache__
parts = fpath.relative_to(p).parts
if any(part.startswith(".") or part == "__pycache__" for part in parts):
continue
try:
doc_id = ingest_file(
str(fpath), db_path=db_path,
chunk_size=chunk_size, chunk_overlap=chunk_overlap,
)
if doc_id is not None:
result["ingested"] += 1
else:
result["skipped"] += 1
except Exception as e:
result["errors"].append({"file": str(fpath), "error": str(e)})
return result
def get_stats(db_path: str = DEFAULT_DB_PATH) -> dict:
"""Return archive statistics."""
conn = get_db(db_path)
docs = conn.execute("SELECT COUNT(*) FROM documents").fetchone()[0]
chunks = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
sources = conn.execute("SELECT COUNT(DISTINCT source) FROM documents").fetchone()[0]
conn.close()
return {"documents": docs, "chunks": chunks, "sources": sources}

205
tests/test_mnemosyne.py Normal file
View File

@@ -0,0 +1,205 @@
"""
Tests for Mnemosyne — The Living Holographic Archive.
Round-trip: ingest sample docs → query → verify results.
"""
import json
import os
import tempfile
import pytest
# Add parent to path for imports
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mnemosyne.ingest import (
chunk_text, ingest_text, ingest_file, ingest_directory,
get_stats, get_db,
)
from mnemosyne.index import keyword_search, query, list_documents, get_document
@pytest.fixture
def db_path(tmp_path):
"""Temporary database for each test."""
return str(tmp_path / "test_mnemosyne.db")
@pytest.fixture
def sample_docs(tmp_path):
"""Create sample documents for testing."""
docs = {}
# Plain text
txt = tmp_path / "alice.txt"
txt.write_text(
"Alice was beginning to get very tired of sitting by her sister on the bank. "
"She had peeped into the book her sister was reading, but it had no pictures "
"or conversations in it. 'And what is the use of a book,' thought Alice, "
"'without pictures or conversations?'"
)
docs["txt"] = str(txt)
# Markdown
md = tmp_path / "readme.md"
md.write_text(
"# Project Mnemosyne\n\n"
"Mnemosyne is a sovereign holographic archive system.\n\n"
"## Features\n\n"
"- Full-text search with FTS5\n"
"- Semantic search with embeddings\n"
"- Reciprocal rank fusion for hybrid results\n"
"- SQLite-backed, no external dependencies\n"
)
docs["md"] = str(md)
# JSON
js = tmp_path / "data.json"
js.write_text(json.dumps({
"title": "The Sovereignty Principle",
"body": "Every person has the right to run their own intelligence on their own hardware, "
"answerable to no one. This is the foundation of digital sovereignty.",
}))
docs["json"] = str(js)
# JSON array
js_arr = tmp_path / "records.json"
js_arr.write_text(json.dumps([
{"title": "Record A", "text": "First record about Bitcoin and the blockchain."},
{"title": "Record B", "text": "Second record about AI and language models."},
]))
docs["json_array"] = str(js_arr)
return docs
class TestChunking:
def test_short_text_no_split(self):
text = "Short text."
chunks = chunk_text(text, chunk_size=100)
assert len(chunks) == 1
assert chunks[0] == text
def test_long_text_splits(self):
text = "word " * 200 # 1000 chars
chunks = chunk_text(text, chunk_size=200, overlap=20)
assert len(chunks) > 1
def test_overlap_exists(self):
text = "aaa " * 100 + "bbb " * 100
chunks = chunk_text(text, chunk_size=200, overlap=50)
# Some chunks should contain both aaa and bbb due to overlap
cross_chunks = [c for c in chunks if "aaa" in c and "bbb" in c]
assert len(cross_chunks) > 0
class TestIngestion:
def test_ingest_text_returns_id(self, db_path):
doc_id = ingest_text("Hello world", source="test", db_path=db_path)
assert doc_id is not None
assert doc_id > 0
def test_ingest_text_dedup(self, db_path):
doc_id1 = ingest_text("Hello world", source="test", db_path=db_path)
doc_id2 = ingest_text("Hello world", source="test", db_path=db_path)
assert doc_id1 is not None
assert doc_id2 is None # duplicate
def test_ingest_file_txt(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["txt"], db_path=db_path)
assert doc_id is not None
def test_ingest_file_json(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["json"], db_path=db_path)
assert doc_id is not None
def test_ingest_file_json_array(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["json_array"], db_path=db_path)
assert doc_id is not None
# Should have ingested 2 records
stats = get_stats(db_path)
assert stats["documents"] == 2
def test_ingest_directory(self, db_path, sample_docs, tmp_path):
result = ingest_directory(str(tmp_path), db_path=db_path)
assert result["ingested"] >= 4
assert len(result["errors"]) == 0
def test_stats(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
ingest_file(sample_docs["md"], db_path=db_path)
stats = get_stats(db_path)
assert stats["documents"] == 2
assert stats["chunks"] >= 2
class TestSearch:
def test_keyword_search(self, db_path, sample_docs):
ingest_file(sample_docs["md"], db_path=db_path)
results = keyword_search("Mnemosyne archive", db_path=db_path)
assert len(results) > 0
assert "mnemosyne" in results[0]["content"].lower() or "archive" in results[0]["content"].lower()
def test_query_returns_results(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
results = query("Alice tired bank", db_path=db_path)
assert len(results) > 0
def test_query_empty_db(self, db_path):
results = query("anything", db_path=db_path)
assert results == []
def test_query_no_match(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
results = query("xyzzyplugh quantum entanglement", db_path=db_path)
assert results == []
def test_list_documents(self, db_path, sample_docs):
ingest_file(sample_docs["txt"], db_path=db_path)
ingest_file(sample_docs["md"], db_path=db_path)
docs = list_documents(db_path=db_path)
assert len(docs) == 2
assert all("chunks" in d for d in docs)
def test_get_document(self, db_path, sample_docs):
doc_id = ingest_file(sample_docs["txt"], db_path=db_path)
doc = get_document(doc_id, db_path=db_path)
assert doc is not None
assert "Alice" in doc["content"]
assert doc["title"] == "alice"
def test_get_document_not_found(self, db_path):
doc = get_document(9999, db_path=db_path)
assert doc is None
class TestRoundTrip:
"""Full round-trip: ingest → query → verify recall."""
def test_round_trip(self, db_path, sample_docs, tmp_path):
# Ingest all sample docs
result = ingest_directory(str(tmp_path), db_path=db_path)
assert result["ingested"] >= 4
# Verify stats
stats = get_stats(db_path)
assert stats["documents"] >= 4
assert stats["chunks"] > 0
# Query for Alice
results = query("Alice pictures conversations", db_path=db_path)
assert len(results) > 0
assert any("alice" in r.get("title", "").lower() or "Alice" in r["content"] for r in results)
# Query for Mnemosyne
results = query("Mnemosyne sovereign archive", db_path=db_path)
assert len(results) > 0
# Query for sovereignty
results = query("sovereignty intelligence hardware", db_path=db_path)
assert len(results) > 0
# List all documents
docs = list_documents(db_path=db_path)
assert len(docs) >= 4