Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
103b641bc0 | ||
|
|
f9b5b2340c | ||
|
|
3a0bd1aa3f | ||
|
|
71c51d2e8c | ||
|
|
f895998581 | ||
|
|
aa1a6349ac |
81
mnemosyne/README.md
Normal file
81
mnemosyne/README.md
Normal file
@@ -0,0 +1,81 @@
|
||||
# Mnemosyne — The Living Holographic Archive
|
||||
|
||||
A sovereign, on-chain anchored memory system that ingests documents, conversations, and artifacts into a searchable holographic index.
|
||||
|
||||
## Design Principles
|
||||
|
||||
- **No network calls** at ingest time — embeddings are optional, compute locally or skip
|
||||
- **SQLite + FTS5 only** — no external vector DB dependency
|
||||
- **Pluggable embedding backend** (sentence-transformers, Ollama, or none)
|
||||
- **Compact** — the whole module < 500 lines of Python
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Ingest documents
|
||||
|
||||
```bash
|
||||
# Single file
|
||||
python -m mnemosyne.cli ingest path/to/document.md
|
||||
|
||||
# Directory tree
|
||||
python -m mnemosyne.cli ingest path/to/docs/
|
||||
|
||||
# Custom chunk size
|
||||
python -m mnemosyne.cli ingest docs/ --chunk-size 1024 --overlap 128
|
||||
```
|
||||
|
||||
### Query the archive
|
||||
|
||||
```bash
|
||||
python -m mnemosyne.cli query "sovereignty and Bitcoin"
|
||||
```
|
||||
|
||||
### Browse the archive
|
||||
|
||||
```bash
|
||||
python -m mnemosyne.cli list
|
||||
python -m mnemosyne.cli stats
|
||||
python -m mnemosyne.cli doc 42
|
||||
```
|
||||
|
||||
## Python API
|
||||
|
||||
```python
|
||||
from mnemosyne.ingest import ingest_text, ingest_file
|
||||
from mnemosyne.index import query
|
||||
|
||||
# Ingest
|
||||
doc_id = ingest_text("Your content here", source="manual", title="My Note")
|
||||
|
||||
# Search
|
||||
results = query("sovereignty and Bitcoin")
|
||||
for r in results:
|
||||
print(f"[{r['score']:.4f}] {r['title']}: {r['content'][:100]}")
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
mnemosyne/
|
||||
├── __init__.py # Package metadata
|
||||
├── ingest.py # Document ingestion + chunking + SQLite storage
|
||||
├── index.py # Holographic index: keyword + semantic search + RRF
|
||||
├── cli.py # CLI entry point
|
||||
└── README.md # This file
|
||||
```
|
||||
|
||||
### Storage Schema
|
||||
|
||||
- **documents** — raw documents with source, title, content, metadata, dedup hash
|
||||
- **chunks** — overlapping text chunks linked to documents
|
||||
- **chunks_fts** — FTS5 virtual table with porter stemming + unicode61 tokenizer
|
||||
|
||||
### Search Modes
|
||||
|
||||
1. **Keyword** (default) — FTS5 full-text search with BM25 scoring
|
||||
2. **Semantic** — cosine similarity over pre-computed embeddings (requires embedding backend)
|
||||
3. **Hybrid** — Reciprocal Rank Fusion merging both result sets
|
||||
|
||||
## Closes
|
||||
|
||||
[#1242](https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1242)
|
||||
10
mnemosyne/__init__.py
Normal file
10
mnemosyne/__init__.py
Normal file
@@ -0,0 +1,10 @@
|
||||
"""
|
||||
Mnemosyne — The Living Holographic Archive
|
||||
|
||||
A sovereign, on-chain anchored memory system that ingests documents,
|
||||
conversations, and artifacts into a searchable holographic index.
|
||||
|
||||
No network calls at ingest time. SQLite + FTS5 only. Pluggable embedding backend.
|
||||
"""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
BIN
mnemosyne/__pycache__/__init__.cpython-311.pyc
Normal file
BIN
mnemosyne/__pycache__/__init__.cpython-311.pyc
Normal file
Binary file not shown.
BIN
mnemosyne/__pycache__/index.cpython-311.pyc
Normal file
BIN
mnemosyne/__pycache__/index.cpython-311.pyc
Normal file
Binary file not shown.
BIN
mnemosyne/__pycache__/ingest.cpython-311.pyc
Normal file
BIN
mnemosyne/__pycache__/ingest.cpython-311.pyc
Normal file
Binary file not shown.
163
mnemosyne/cli.py
Normal file
163
mnemosyne/cli.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""
|
||||
Mnemosyne CLI
|
||||
|
||||
Usage:
|
||||
mnemosyne ingest <path> [--db PATH] [--chunk-size N] [--overlap N]
|
||||
mnemosyne query <text> [--db PATH] [--limit N]
|
||||
mnemosyne list [--db PATH] [--limit N]
|
||||
mnemosyne stats [--db PATH]
|
||||
mnemosyne doc <id> [--db PATH]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from .ingest import ingest_file, ingest_directory, get_stats, DEFAULT_DB_PATH, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_OVERLAP
|
||||
from .index import query, list_documents, get_document
|
||||
|
||||
|
||||
def cmd_ingest(args):
|
||||
"""Ingest files or directories into the archive."""
|
||||
p = Path(args.path)
|
||||
db = args.db or DEFAULT_DB_PATH
|
||||
|
||||
if p.is_dir():
|
||||
result = ingest_directory(
|
||||
str(p), db_path=db,
|
||||
chunk_size=args.chunk_size, chunk_overlap=args.overlap,
|
||||
)
|
||||
print(f"Ingested: {result['ingested']} files")
|
||||
print(f"Skipped (duplicates): {result['skipped']}")
|
||||
if result["errors"]:
|
||||
print(f"Errors: {len(result['errors'])}")
|
||||
for err in result["errors"]:
|
||||
print(f" {err['file']}: {err['error']}")
|
||||
elif p.is_file():
|
||||
doc_id = ingest_file(
|
||||
str(p), db_path=db,
|
||||
chunk_size=args.chunk_size, chunk_overlap=args.overlap,
|
||||
)
|
||||
if doc_id is not None:
|
||||
print(f"Ingested: {p.name} (doc_id={doc_id})")
|
||||
else:
|
||||
print(f"Skipped (duplicate): {p.name}")
|
||||
else:
|
||||
print(f"Error: {args.path} not found", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_query(args):
|
||||
"""Query the holographic archive."""
|
||||
db = args.db or DEFAULT_DB_PATH
|
||||
results = query(args.text, db_path=db, limit=args.limit)
|
||||
|
||||
if not results:
|
||||
print("No results found.")
|
||||
return
|
||||
|
||||
for i, r in enumerate(results, 1):
|
||||
source = r.get("source", "?")
|
||||
title = r.get("title") or Path(source).name
|
||||
score = r.get("rrf_score") or r.get("score", 0)
|
||||
methods = r.get("methods") or [r.get("method", "?")]
|
||||
content_preview = r["content"][:200].replace("\n", " ")
|
||||
|
||||
print(f"[{i}] {title}")
|
||||
print(f" Source: {source}")
|
||||
print(f" Score: {score:.4f} ({', '.join(methods)})")
|
||||
print(f" {content_preview}...")
|
||||
print()
|
||||
|
||||
|
||||
def cmd_list(args):
|
||||
"""List documents in the archive."""
|
||||
db = args.db or DEFAULT_DB_PATH
|
||||
docs = list_documents(db_path=db, limit=args.limit)
|
||||
|
||||
if not docs:
|
||||
print("Archive is empty.")
|
||||
return
|
||||
|
||||
print(f"{'ID':>5} {'Chunks':>6} {'Title':<40} Source")
|
||||
print("-" * 90)
|
||||
for d in docs:
|
||||
title = (d["title"] or "?")[:40]
|
||||
source = Path(d["source"]).name[:30] if d["source"] else "?"
|
||||
print(f"{d['id']:>5} {d['chunks']:>6} {title:<40} {source}")
|
||||
|
||||
|
||||
def cmd_stats(args):
|
||||
"""Show archive statistics."""
|
||||
db = args.db or DEFAULT_DB_PATH
|
||||
s = get_stats(db_path=db)
|
||||
print(f"Documents: {s['documents']}")
|
||||
print(f"Chunks: {s['chunks']}")
|
||||
print(f"Sources: {s['sources']}")
|
||||
|
||||
|
||||
def cmd_doc(args):
|
||||
"""Show a document by ID."""
|
||||
db = args.db or DEFAULT_DB_PATH
|
||||
d = get_document(args.id, db_path=db)
|
||||
if not d:
|
||||
print(f"Document #{args.id} not found.")
|
||||
sys.exit(1)
|
||||
print(f"ID: {d['id']}")
|
||||
print(f"Title: {d['title']}")
|
||||
print(f"Source: {d['source']}")
|
||||
print(f"Ingested: {d['ingested_at']}")
|
||||
print(f"Metadata: {json.dumps(d['metadata'], indent=2)}")
|
||||
print(f"\n--- Content ({len(d['content'])} chars) ---\n")
|
||||
print(d["content"])
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="mnemosyne",
|
||||
description="Mnemosyne — The Living Holographic Archive",
|
||||
)
|
||||
parser.add_argument("--db", help="Database path (default: mnemosyne.db)")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
|
||||
# ingest
|
||||
p_ingest = sub.add_parser("ingest", help="Ingest files or directories")
|
||||
p_ingest.add_argument("path", help="File or directory to ingest")
|
||||
p_ingest.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
|
||||
p_ingest.add_argument("--overlap", type=int, default=DEFAULT_CHUNK_OVERLAP)
|
||||
|
||||
# query
|
||||
p_query = sub.add_parser("query", help="Search the archive")
|
||||
p_query.add_argument("text", help="Search query")
|
||||
p_query.add_argument("--limit", type=int, default=10)
|
||||
|
||||
# list
|
||||
p_list = sub.add_parser("list", help="List documents in archive")
|
||||
p_list.add_argument("--limit", type=int, default=50)
|
||||
|
||||
# stats
|
||||
sub.add_parser("stats", help="Show archive statistics")
|
||||
|
||||
# doc
|
||||
p_doc = sub.add_parser("doc", help="Show document by ID")
|
||||
p_doc.add_argument("id", type=int, help="Document ID")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == "ingest":
|
||||
cmd_ingest(args)
|
||||
elif args.command == "query":
|
||||
cmd_query(args)
|
||||
elif args.command == "list":
|
||||
cmd_list(args)
|
||||
elif args.command == "stats":
|
||||
cmd_stats(args)
|
||||
elif args.command == "doc":
|
||||
cmd_doc(args)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
228
mnemosyne/index.py
Normal file
228
mnemosyne/index.py
Normal file
@@ -0,0 +1,228 @@
|
||||
"""
|
||||
Mnemosyne Holographic Index
|
||||
|
||||
Query interface: keyword search (FTS5) + semantic search (embedding similarity).
|
||||
Merges results with reciprocal rank fusion.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import math
|
||||
from typing import Optional
|
||||
from .ingest import get_db, DEFAULT_DB_PATH
|
||||
|
||||
|
||||
def keyword_search(
|
||||
query: str,
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
limit: int = 10,
|
||||
) -> list[dict]:
|
||||
"""Full-text search using FTS5 with BM25 scoring.
|
||||
|
||||
Returns list of {chunk_id, doc_id, content, source, title, score}.
|
||||
"""
|
||||
conn = get_db(db_path)
|
||||
|
||||
# FTS5 query with BM25 ranking
|
||||
rows = conn.execute("""
|
||||
SELECT
|
||||
c.id as chunk_id,
|
||||
c.doc_id,
|
||||
c.content,
|
||||
d.source,
|
||||
d.title,
|
||||
d.metadata,
|
||||
rank as bm25_score
|
||||
FROM chunks_fts fts
|
||||
JOIN chunks c ON c.id = fts.rowid
|
||||
JOIN documents d ON d.id = c.doc_id
|
||||
WHERE chunks_fts MATCH ?
|
||||
ORDER BY rank
|
||||
LIMIT ?
|
||||
""", (query, limit)).fetchall()
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
results.append({
|
||||
"chunk_id": row[0],
|
||||
"doc_id": row[1],
|
||||
"content": row[2],
|
||||
"source": row[3],
|
||||
"title": row[4],
|
||||
"metadata": json.loads(row[5]) if row[5] else {},
|
||||
"score": abs(row[6]), # BM25 is negative, take abs for ranking
|
||||
"method": "keyword",
|
||||
})
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def semantic_search(
|
||||
query_embedding: list[float],
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
limit: int = 10,
|
||||
) -> list[dict]:
|
||||
"""Cosine similarity search over stored embeddings.
|
||||
|
||||
Requires embeddings to be pre-computed and stored as BLOB in chunks table.
|
||||
Returns empty list if no embeddings are available.
|
||||
"""
|
||||
conn = get_db(db_path)
|
||||
|
||||
# Check if any embeddings exist
|
||||
has_embeddings = conn.execute(
|
||||
"SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL"
|
||||
).fetchone()[0]
|
||||
|
||||
if has_embeddings == 0:
|
||||
conn.close()
|
||||
return []
|
||||
|
||||
rows = conn.execute("""
|
||||
SELECT
|
||||
c.id as chunk_id,
|
||||
c.doc_id,
|
||||
c.content,
|
||||
c.embedding,
|
||||
d.source,
|
||||
d.title,
|
||||
d.metadata
|
||||
FROM chunks c
|
||||
JOIN documents d ON d.id = c.doc_id
|
||||
WHERE c.embedding IS NOT NULL
|
||||
""").fetchall()
|
||||
|
||||
import struct
|
||||
results = []
|
||||
query_norm = math.sqrt(sum(x * x for x in query_embedding)) or 1.0
|
||||
|
||||
for row in rows:
|
||||
# Deserialize embedding from BLOB (list of float32)
|
||||
emb_bytes = row[3]
|
||||
n_floats = len(emb_bytes) // 4
|
||||
emb = struct.unpack(f"{n_floats}f", emb_bytes)
|
||||
|
||||
# Cosine similarity
|
||||
dot = sum(a * b for a, b in zip(query_embedding, emb))
|
||||
emb_norm = math.sqrt(sum(x * x for x in emb)) or 1.0
|
||||
similarity = dot / (query_norm * emb_norm)
|
||||
|
||||
results.append({
|
||||
"chunk_id": row[0],
|
||||
"doc_id": row[1],
|
||||
"content": row[2],
|
||||
"source": row[4],
|
||||
"title": row[5],
|
||||
"metadata": json.loads(row[6]) if row[6] else {},
|
||||
"score": similarity,
|
||||
"method": "semantic",
|
||||
})
|
||||
|
||||
conn.close()
|
||||
results.sort(key=lambda x: x["score"], reverse=True)
|
||||
return results[:limit]
|
||||
|
||||
|
||||
def reciprocal_rank_fusion(
|
||||
keyword_results: list[dict],
|
||||
semantic_results: list[dict],
|
||||
k: int = 60,
|
||||
limit: int = 10,
|
||||
) -> list[dict]:
|
||||
"""Merge keyword and semantic results using Reciprocal Rank Fusion.
|
||||
|
||||
RRF score = sum(1 / (k + rank_i)) across result lists.
|
||||
"""
|
||||
rrf_scores: dict[int, float] = {}
|
||||
chunk_map: dict[int, dict] = {}
|
||||
|
||||
for rank, result in enumerate(keyword_results):
|
||||
cid = result["chunk_id"]
|
||||
rrf_scores[cid] = rrf_scores.get(cid, 0) + 1.0 / (k + rank + 1)
|
||||
chunk_map[cid] = result
|
||||
|
||||
for rank, result in enumerate(semantic_results):
|
||||
cid = result["chunk_id"]
|
||||
rrf_scores[cid] = rrf_scores.get(cid, 0) + 1.0 / (k + rank + 1)
|
||||
chunk_map[cid] = result
|
||||
|
||||
# Sort by RRF score
|
||||
merged = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
|
||||
|
||||
results = []
|
||||
for cid, score in merged[:limit]:
|
||||
entry = chunk_map[cid].copy()
|
||||
entry["rrf_score"] = score
|
||||
entry["methods"] = []
|
||||
if any(r["chunk_id"] == cid for r in keyword_results):
|
||||
entry["methods"].append("keyword")
|
||||
if any(r["chunk_id"] == cid for r in semantic_results):
|
||||
entry["methods"].append("semantic")
|
||||
results.append(entry)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def query(
|
||||
text: str,
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
limit: int = 10,
|
||||
query_embedding: Optional[list[float]] = None,
|
||||
) -> list[dict]:
|
||||
"""Unified query: keyword search + optional semantic search, merged with RRF.
|
||||
|
||||
If query_embedding is provided and embeddings exist in DB, uses hybrid search.
|
||||
Otherwise falls back to keyword-only.
|
||||
"""
|
||||
kw_results = keyword_search(text, db_path=db_path, limit=limit)
|
||||
|
||||
if query_embedding is not None:
|
||||
sem_results = semantic_search(query_embedding, db_path=db_path, limit=limit)
|
||||
if sem_results:
|
||||
return reciprocal_rank_fusion(kw_results, sem_results, limit=limit)
|
||||
|
||||
return kw_results
|
||||
|
||||
|
||||
def get_document(doc_id: int, db_path: str = DEFAULT_DB_PATH) -> Optional[dict]:
|
||||
"""Retrieve a full document by ID."""
|
||||
conn = get_db(db_path)
|
||||
row = conn.execute(
|
||||
"SELECT id, source, title, content, metadata, ingested_at FROM documents WHERE id = ?",
|
||||
(doc_id,),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if not row:
|
||||
return None
|
||||
return {
|
||||
"id": row[0],
|
||||
"source": row[1],
|
||||
"title": row[2],
|
||||
"content": row[3],
|
||||
"metadata": json.loads(row[4]) if row[4] else {},
|
||||
"ingested_at": row[5],
|
||||
}
|
||||
|
||||
|
||||
def list_documents(
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
limit: int = 50,
|
||||
offset: int = 0,
|
||||
) -> list[dict]:
|
||||
"""List documents in the archive with chunk counts."""
|
||||
conn = get_db(db_path)
|
||||
rows = conn.execute("""
|
||||
SELECT d.id, d.source, d.title, d.ingested_at,
|
||||
COUNT(c.id) as chunk_count
|
||||
FROM documents d
|
||||
LEFT JOIN chunks c ON c.doc_id = d.id
|
||||
GROUP BY d.id
|
||||
ORDER BY d.ingested_at DESC
|
||||
LIMIT ? OFFSET ?
|
||||
""", (limit, offset)).fetchall()
|
||||
conn.close()
|
||||
return [
|
||||
{"id": r[0], "source": r[1], "title": r[2], "ingested_at": r[3], "chunks": r[4]}
|
||||
for r in rows
|
||||
]
|
||||
267
mnemosyne/ingest.py
Normal file
267
mnemosyne/ingest.py
Normal file
@@ -0,0 +1,267 @@
|
||||
"""
|
||||
Mnemosyne Ingestion Pipeline
|
||||
|
||||
Accepts text/JSON/markdown inputs, chunks them with overlap,
|
||||
stores in local SQLite + FTS5 for keyword search.
|
||||
Embedding backend is pluggable (compute locally or skip).
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
DEFAULT_CHUNK_SIZE = 512
|
||||
DEFAULT_CHUNK_OVERLAP = 64
|
||||
DEFAULT_DB_PATH = "mnemosyne.db"
|
||||
|
||||
|
||||
def get_db(db_path: str = DEFAULT_DB_PATH) -> sqlite3.Connection:
|
||||
"""Open or create the Mnemosyne SQLite database with FTS5 tables."""
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS documents (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
doc_hash TEXT UNIQUE NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
title TEXT,
|
||||
content TEXT NOT NULL,
|
||||
metadata TEXT DEFAULT '{}',
|
||||
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS chunks (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
doc_id INTEGER NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
|
||||
chunk_index INTEGER NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
embedding BLOB,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE(doc_id, chunk_index)
|
||||
);
|
||||
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5(
|
||||
content,
|
||||
content=chunks,
|
||||
content_rowid=id,
|
||||
tokenize='porter unicode61'
|
||||
);
|
||||
|
||||
-- Triggers to keep FTS5 in sync
|
||||
CREATE TRIGGER IF NOT EXISTS chunks_ai AFTER INSERT ON chunks BEGIN
|
||||
INSERT INTO chunks_fts(rowid, content) VALUES (new.id, new.content);
|
||||
END;
|
||||
|
||||
CREATE TRIGGER IF NOT EXISTS chunks_ad AFTER DELETE ON chunks BEGIN
|
||||
INSERT INTO chunks_fts(chunks_fts, rowid, content)
|
||||
VALUES('delete', old.id, old.content);
|
||||
END;
|
||||
|
||||
CREATE TRIGGER IF NOT EXISTS chunks_au AFTER UPDATE ON chunks BEGIN
|
||||
INSERT INTO chunks_fts(chunks_fts, rowid, content)
|
||||
VALUES('delete', old.id, old.content);
|
||||
INSERT INTO chunks_fts(rowid, content) VALUES (new.id, new.content);
|
||||
END;
|
||||
""")
|
||||
conn.commit()
|
||||
return conn
|
||||
|
||||
|
||||
def chunk_text(
|
||||
text: str,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
overlap: int = DEFAULT_CHUNK_OVERLAP,
|
||||
) -> list[str]:
|
||||
"""Split text into overlapping chunks by character count.
|
||||
|
||||
Tries to break at paragraph > sentence > word boundaries.
|
||||
"""
|
||||
if len(text) <= chunk_size:
|
||||
return [text]
|
||||
|
||||
chunks = []
|
||||
start = 0
|
||||
while start < len(text):
|
||||
end = start + chunk_size
|
||||
if end >= len(text):
|
||||
chunks.append(text[start:].strip())
|
||||
break
|
||||
|
||||
# Try to find a clean break point
|
||||
segment = text[start:end]
|
||||
|
||||
# Prefer paragraph break
|
||||
last_para = segment.rfind("\n\n")
|
||||
if last_para > chunk_size * 0.5:
|
||||
end = start + last_para + 2
|
||||
else:
|
||||
# Try sentence boundary
|
||||
last_period = max(
|
||||
segment.rfind(". "),
|
||||
segment.rfind("! "),
|
||||
segment.rfind("? "),
|
||||
segment.rfind(".\n"),
|
||||
)
|
||||
if last_period > chunk_size * 0.5:
|
||||
end = start + last_period + 2
|
||||
else:
|
||||
# Fall back to word boundary
|
||||
last_space = segment.rfind(" ")
|
||||
if last_space > chunk_size * 0.5:
|
||||
end = start + last_space + 1
|
||||
|
||||
chunk = text[start:end].strip()
|
||||
if chunk:
|
||||
chunks.append(chunk)
|
||||
start = max(start + 1, end - overlap)
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def _hash_content(content: str, source: str) -> str:
|
||||
"""Deterministic hash for deduplication."""
|
||||
return hashlib.sha256(f"{source}:{content}".encode()).hexdigest()[:32]
|
||||
|
||||
|
||||
def ingest_text(
|
||||
content: str,
|
||||
source: str = "inline",
|
||||
title: Optional[str] = None,
|
||||
metadata: Optional[dict] = None,
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
|
||||
) -> Optional[int]:
|
||||
"""Ingest a single text document into the archive.
|
||||
|
||||
Returns the doc_id if new, None if duplicate.
|
||||
"""
|
||||
conn = get_db(db_path)
|
||||
doc_hash = _hash_content(content, source)
|
||||
|
||||
# Deduplicate
|
||||
existing = conn.execute(
|
||||
"SELECT id FROM documents WHERE doc_hash = ?", (doc_hash,)
|
||||
).fetchone()
|
||||
if existing:
|
||||
conn.close()
|
||||
return None
|
||||
|
||||
cursor = conn.execute(
|
||||
"INSERT INTO documents (doc_hash, source, title, content, metadata) VALUES (?, ?, ?, ?, ?)",
|
||||
(doc_hash, source, title, content, json.dumps(metadata or {})),
|
||||
)
|
||||
doc_id = cursor.lastrowid
|
||||
|
||||
chunks = chunk_text(content, chunk_size, chunk_overlap)
|
||||
for i, chunk in enumerate(chunks):
|
||||
conn.execute(
|
||||
"INSERT INTO chunks (doc_id, chunk_index, content) VALUES (?, ?, ?)",
|
||||
(doc_id, i, chunk),
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return doc_id
|
||||
|
||||
|
||||
def ingest_file(
|
||||
path: str,
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
|
||||
) -> Optional[int]:
|
||||
"""Ingest a file (text, markdown, JSON) into the archive.
|
||||
|
||||
For JSON files, extracts text from common fields (body, text, content, message).
|
||||
"""
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
raise FileNotFoundError(f"File not found: {path}")
|
||||
|
||||
source = str(p.resolve())
|
||||
title = p.stem
|
||||
|
||||
if p.suffix.lower() == ".json":
|
||||
data = json.loads(p.read_text())
|
||||
if isinstance(data, str):
|
||||
content = data
|
||||
elif isinstance(data, dict):
|
||||
content = data.get("body") or data.get("text") or data.get("content") or data.get("message") or json.dumps(data, indent=2)
|
||||
title = data.get("title", title)
|
||||
elif isinstance(data, list):
|
||||
# Array of records — ingest each as a separate doc
|
||||
ids = []
|
||||
for item in data:
|
||||
if isinstance(item, str):
|
||||
rid = ingest_text(item, source=source, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
||||
else:
|
||||
text_content = item.get("body") or item.get("text") or item.get("content") or json.dumps(item, indent=2)
|
||||
item_title = item.get("title", title)
|
||||
rid = ingest_text(text_content, source=source, title=item_title, metadata=item, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
||||
if rid is not None:
|
||||
ids.append(rid)
|
||||
return ids[0] if ids else None
|
||||
else:
|
||||
content = json.dumps(data, indent=2)
|
||||
else:
|
||||
content = p.read_text(encoding="utf-8", errors="replace")
|
||||
|
||||
return ingest_text(content, source=source, title=title, db_path=db_path, chunk_size=chunk_size, chunk_overlap=chunk_overlap)
|
||||
|
||||
|
||||
def ingest_directory(
|
||||
dir_path: str,
|
||||
extensions: tuple[str, ...] = (".txt", ".md", ".json", ".py", ".js", ".yaml", ".yml"),
|
||||
db_path: str = DEFAULT_DB_PATH,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
chunk_overlap: int = DEFAULT_CHUNK_OVERLAP,
|
||||
) -> dict:
|
||||
"""Ingest all matching files from a directory tree.
|
||||
|
||||
Returns {"ingested": N, "skipped": N, "errors": [...]}
|
||||
"""
|
||||
result = {"ingested": 0, "skipped": 0, "errors": []}
|
||||
p = Path(dir_path)
|
||||
if not p.is_dir():
|
||||
raise NotADirectoryError(f"Not a directory: {dir_path}")
|
||||
|
||||
for fpath in sorted(p.rglob("*")):
|
||||
if not fpath.is_file():
|
||||
continue
|
||||
if fpath.suffix.lower() not in extensions:
|
||||
continue
|
||||
# Skip hidden dirs and __pycache__
|
||||
parts = fpath.relative_to(p).parts
|
||||
if any(part.startswith(".") or part == "__pycache__" for part in parts):
|
||||
continue
|
||||
try:
|
||||
doc_id = ingest_file(
|
||||
str(fpath), db_path=db_path,
|
||||
chunk_size=chunk_size, chunk_overlap=chunk_overlap,
|
||||
)
|
||||
if doc_id is not None:
|
||||
result["ingested"] += 1
|
||||
else:
|
||||
result["skipped"] += 1
|
||||
except Exception as e:
|
||||
result["errors"].append({"file": str(fpath), "error": str(e)})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_stats(db_path: str = DEFAULT_DB_PATH) -> dict:
|
||||
"""Return archive statistics."""
|
||||
conn = get_db(db_path)
|
||||
docs = conn.execute("SELECT COUNT(*) FROM documents").fetchone()[0]
|
||||
chunks = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()[0]
|
||||
sources = conn.execute("SELECT COUNT(DISTINCT source) FROM documents").fetchone()[0]
|
||||
conn.close()
|
||||
return {"documents": docs, "chunks": chunks, "sources": sources}
|
||||
205
tests/test_mnemosyne.py
Normal file
205
tests/test_mnemosyne.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""
|
||||
Tests for Mnemosyne — The Living Holographic Archive.
|
||||
|
||||
Round-trip: ingest sample docs → query → verify results.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import pytest
|
||||
|
||||
# Add parent to path for imports
|
||||
import sys
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from mnemosyne.ingest import (
|
||||
chunk_text, ingest_text, ingest_file, ingest_directory,
|
||||
get_stats, get_db,
|
||||
)
|
||||
from mnemosyne.index import keyword_search, query, list_documents, get_document
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db_path(tmp_path):
|
||||
"""Temporary database for each test."""
|
||||
return str(tmp_path / "test_mnemosyne.db")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_docs(tmp_path):
|
||||
"""Create sample documents for testing."""
|
||||
docs = {}
|
||||
|
||||
# Plain text
|
||||
txt = tmp_path / "alice.txt"
|
||||
txt.write_text(
|
||||
"Alice was beginning to get very tired of sitting by her sister on the bank. "
|
||||
"She had peeped into the book her sister was reading, but it had no pictures "
|
||||
"or conversations in it. 'And what is the use of a book,' thought Alice, "
|
||||
"'without pictures or conversations?'"
|
||||
)
|
||||
docs["txt"] = str(txt)
|
||||
|
||||
# Markdown
|
||||
md = tmp_path / "readme.md"
|
||||
md.write_text(
|
||||
"# Project Mnemosyne\n\n"
|
||||
"Mnemosyne is a sovereign holographic archive system.\n\n"
|
||||
"## Features\n\n"
|
||||
"- Full-text search with FTS5\n"
|
||||
"- Semantic search with embeddings\n"
|
||||
"- Reciprocal rank fusion for hybrid results\n"
|
||||
"- SQLite-backed, no external dependencies\n"
|
||||
)
|
||||
docs["md"] = str(md)
|
||||
|
||||
# JSON
|
||||
js = tmp_path / "data.json"
|
||||
js.write_text(json.dumps({
|
||||
"title": "The Sovereignty Principle",
|
||||
"body": "Every person has the right to run their own intelligence on their own hardware, "
|
||||
"answerable to no one. This is the foundation of digital sovereignty.",
|
||||
}))
|
||||
docs["json"] = str(js)
|
||||
|
||||
# JSON array
|
||||
js_arr = tmp_path / "records.json"
|
||||
js_arr.write_text(json.dumps([
|
||||
{"title": "Record A", "text": "First record about Bitcoin and the blockchain."},
|
||||
{"title": "Record B", "text": "Second record about AI and language models."},
|
||||
]))
|
||||
docs["json_array"] = str(js_arr)
|
||||
|
||||
return docs
|
||||
|
||||
|
||||
class TestChunking:
|
||||
def test_short_text_no_split(self):
|
||||
text = "Short text."
|
||||
chunks = chunk_text(text, chunk_size=100)
|
||||
assert len(chunks) == 1
|
||||
assert chunks[0] == text
|
||||
|
||||
def test_long_text_splits(self):
|
||||
text = "word " * 200 # 1000 chars
|
||||
chunks = chunk_text(text, chunk_size=200, overlap=20)
|
||||
assert len(chunks) > 1
|
||||
|
||||
def test_overlap_exists(self):
|
||||
text = "aaa " * 100 + "bbb " * 100
|
||||
chunks = chunk_text(text, chunk_size=200, overlap=50)
|
||||
# Some chunks should contain both aaa and bbb due to overlap
|
||||
cross_chunks = [c for c in chunks if "aaa" in c and "bbb" in c]
|
||||
assert len(cross_chunks) > 0
|
||||
|
||||
|
||||
class TestIngestion:
|
||||
def test_ingest_text_returns_id(self, db_path):
|
||||
doc_id = ingest_text("Hello world", source="test", db_path=db_path)
|
||||
assert doc_id is not None
|
||||
assert doc_id > 0
|
||||
|
||||
def test_ingest_text_dedup(self, db_path):
|
||||
doc_id1 = ingest_text("Hello world", source="test", db_path=db_path)
|
||||
doc_id2 = ingest_text("Hello world", source="test", db_path=db_path)
|
||||
assert doc_id1 is not None
|
||||
assert doc_id2 is None # duplicate
|
||||
|
||||
def test_ingest_file_txt(self, db_path, sample_docs):
|
||||
doc_id = ingest_file(sample_docs["txt"], db_path=db_path)
|
||||
assert doc_id is not None
|
||||
|
||||
def test_ingest_file_json(self, db_path, sample_docs):
|
||||
doc_id = ingest_file(sample_docs["json"], db_path=db_path)
|
||||
assert doc_id is not None
|
||||
|
||||
def test_ingest_file_json_array(self, db_path, sample_docs):
|
||||
doc_id = ingest_file(sample_docs["json_array"], db_path=db_path)
|
||||
assert doc_id is not None
|
||||
# Should have ingested 2 records
|
||||
stats = get_stats(db_path)
|
||||
assert stats["documents"] == 2
|
||||
|
||||
def test_ingest_directory(self, db_path, sample_docs, tmp_path):
|
||||
result = ingest_directory(str(tmp_path), db_path=db_path)
|
||||
assert result["ingested"] >= 4
|
||||
assert len(result["errors"]) == 0
|
||||
|
||||
def test_stats(self, db_path, sample_docs):
|
||||
ingest_file(sample_docs["txt"], db_path=db_path)
|
||||
ingest_file(sample_docs["md"], db_path=db_path)
|
||||
stats = get_stats(db_path)
|
||||
assert stats["documents"] == 2
|
||||
assert stats["chunks"] >= 2
|
||||
|
||||
|
||||
class TestSearch:
|
||||
def test_keyword_search(self, db_path, sample_docs):
|
||||
ingest_file(sample_docs["md"], db_path=db_path)
|
||||
results = keyword_search("Mnemosyne archive", db_path=db_path)
|
||||
assert len(results) > 0
|
||||
assert "mnemosyne" in results[0]["content"].lower() or "archive" in results[0]["content"].lower()
|
||||
|
||||
def test_query_returns_results(self, db_path, sample_docs):
|
||||
ingest_file(sample_docs["txt"], db_path=db_path)
|
||||
results = query("Alice tired bank", db_path=db_path)
|
||||
assert len(results) > 0
|
||||
|
||||
def test_query_empty_db(self, db_path):
|
||||
results = query("anything", db_path=db_path)
|
||||
assert results == []
|
||||
|
||||
def test_query_no_match(self, db_path, sample_docs):
|
||||
ingest_file(sample_docs["txt"], db_path=db_path)
|
||||
results = query("xyzzyplugh quantum entanglement", db_path=db_path)
|
||||
assert results == []
|
||||
|
||||
def test_list_documents(self, db_path, sample_docs):
|
||||
ingest_file(sample_docs["txt"], db_path=db_path)
|
||||
ingest_file(sample_docs["md"], db_path=db_path)
|
||||
docs = list_documents(db_path=db_path)
|
||||
assert len(docs) == 2
|
||||
assert all("chunks" in d for d in docs)
|
||||
|
||||
def test_get_document(self, db_path, sample_docs):
|
||||
doc_id = ingest_file(sample_docs["txt"], db_path=db_path)
|
||||
doc = get_document(doc_id, db_path=db_path)
|
||||
assert doc is not None
|
||||
assert "Alice" in doc["content"]
|
||||
assert doc["title"] == "alice"
|
||||
|
||||
def test_get_document_not_found(self, db_path):
|
||||
doc = get_document(9999, db_path=db_path)
|
||||
assert doc is None
|
||||
|
||||
|
||||
class TestRoundTrip:
|
||||
"""Full round-trip: ingest → query → verify recall."""
|
||||
|
||||
def test_round_trip(self, db_path, sample_docs, tmp_path):
|
||||
# Ingest all sample docs
|
||||
result = ingest_directory(str(tmp_path), db_path=db_path)
|
||||
assert result["ingested"] >= 4
|
||||
|
||||
# Verify stats
|
||||
stats = get_stats(db_path)
|
||||
assert stats["documents"] >= 4
|
||||
assert stats["chunks"] > 0
|
||||
|
||||
# Query for Alice
|
||||
results = query("Alice pictures conversations", db_path=db_path)
|
||||
assert len(results) > 0
|
||||
assert any("alice" in r.get("title", "").lower() or "Alice" in r["content"] for r in results)
|
||||
|
||||
# Query for Mnemosyne
|
||||
results = query("Mnemosyne sovereign archive", db_path=db_path)
|
||||
assert len(results) > 0
|
||||
|
||||
# Query for sovereignty
|
||||
results = query("sovereignty intelligence hardware", db_path=db_path)
|
||||
assert len(results) > 0
|
||||
|
||||
# List all documents
|
||||
docs = list_documents(db_path=db_path)
|
||||
assert len(docs) >= 4
|
||||
Reference in New Issue
Block a user