Compare commits

...

6 Commits

Author SHA1 Message Date
98cdc34a36 feat: add ingest-dir CLI command (#1275)
mnemosyne ingest-dir <path> [--ext md,txt] [--topics topic1,topic2]
2026-04-12 11:51:56 +00:00
63ac52dc24 feat: export ingest_file and ingest_directory 2026-04-12 11:47:55 +00:00
25f6ffc050 feat: add file and directory ingestion pipeline (#1275)
- ingest_file() reads a single file, extracts title from headings, chunks large files
- ingest_directory() walks directory tree, ingests matching files
- Dedup via source_ref (file path + mtime)
- Chunking at heading and paragraph boundaries for large files
2026-04-12 11:47:20 +00:00
0f87258a1e test: verify PUT API works 2026-04-12 11:46:20 +00:00
72d9c1a303 [claude] Mnemosyne Memory Resonance — latent connection discovery (#1272) (#1274)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 4s
2026-04-12 11:18:54 +00:00
fd8f82315c [claude] Mnemosyne archive snapshots — backup and restore (#1268) (#1270)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
2026-04-12 09:49:31 +00:00
6 changed files with 930 additions and 4 deletions

View File

@@ -13,7 +13,7 @@ from __future__ import annotations
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event, ingest_file, ingest_directory
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
@@ -27,6 +27,8 @@ __all__ = [
"HolographicLinker",
"ingest_from_mempalace",
"ingest_event",
"ingest_file",
"ingest_directory",
"EmbeddingBackend",
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",

View File

@@ -1105,6 +1105,241 @@ class MnemosyneArchive:
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
return steps
# ─── Snapshot / Backup ────────────────────────────────────
def _snapshot_dir(self) -> Path:
"""Return (and create) the snapshots directory next to the archive."""
d = self.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
@staticmethod
def _snapshot_filename(timestamp: str, label: str) -> str:
"""Build a deterministic snapshot filename."""
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
return f"{timestamp}_{safe_label}.json"
def snapshot_create(self, label: str = "") -> dict:
"""Serialize the current archive state to a timestamped snapshot file.
Args:
label: Human-readable label for the snapshot (optional).
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = self._snapshot_filename(timestamp, label)
snapshot_id = filename[:-5] # strip .json
snap_path = self._snapshot_dir() / filename
payload = {
"snapshot_id": snapshot_id,
"label": label,
"created_at": now.isoformat(),
"entry_count": len(self._entries),
"archive_path": str(self.path),
"entries": [e.to_dict() for e in self._entries.values()],
}
with open(snap_path, "w") as f:
json.dump(payload, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label,
"created_at": payload["created_at"],
"entry_count": payload["entry_count"],
"path": str(snap_path),
}
def snapshot_list(self) -> list[dict]:
"""List available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
"""
snap_dir = self._snapshot_dir()
snapshots = []
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
try:
with open(snap_path) as f:
data = json.load(f)
snapshots.append({
"snapshot_id": data.get("snapshot_id", snap_path.stem),
"label": data.get("label", ""),
"created_at": data.get("created_at", ""),
"entry_count": data.get("entry_count", len(data.get("entries", []))),
"path": str(snap_path),
})
except (json.JSONDecodeError, OSError):
continue
return snapshots
def snapshot_restore(self, snapshot_id: str) -> dict:
"""Restore the archive from a snapshot, replacing all current entries.
Args:
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
Returns:
Dict with keys: snapshot_id, restored_count, previous_count
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
previous_count = len(self._entries)
self._entries = {}
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
self._save()
return {
"snapshot_id": snapshot_id,
"restored_count": len(self._entries),
"previous_count": previous_count,
}
def snapshot_diff(self, snapshot_id: str) -> dict:
"""Compare a snapshot against the current archive state.
Args:
snapshot_id: The snapshot_id to compare against current state.
Returns:
Dict with keys:
- snapshot_id: str
- added: list of {id, title} — in current, not in snapshot
- removed: list of {id, title} — in snapshot, not in current
- modified: list of {id, title, snapshot_hash, current_hash}
- unchanged: int — count of identical entries
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
snap_entries: dict[str, dict] = {}
for entry_data in data.get("entries", []):
snap_entries[entry_data["id"]] = entry_data
current_ids = set(self._entries.keys())
snap_ids = set(snap_entries.keys())
added = []
for eid in current_ids - snap_ids:
e = self._entries[eid]
added.append({"id": e.id, "title": e.title})
removed = []
for eid in snap_ids - current_ids:
snap_e = snap_entries[eid]
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
modified = []
unchanged = 0
for eid in current_ids & snap_ids:
current_hash = self._entries[eid].content_hash
snap_hash = snap_entries[eid].get("content_hash")
if current_hash != snap_hash:
modified.append({
"id": eid,
"title": self._entries[eid].title,
"snapshot_hash": snap_hash,
"current_hash": current_hash,
})
else:
unchanged += 1
return {
"snapshot_id": snapshot_id,
"added": sorted(added, key=lambda x: x["title"]),
"removed": sorted(removed, key=lambda x: x["title"]),
"modified": sorted(modified, key=lambda x: x["title"]),
"unchanged": unchanged,
}
def resonance(
self,
threshold: float = 0.3,
limit: int = 20,
topic: Optional[str] = None,
) -> list[dict]:
"""Discover latent connections — pairs with high similarity but no existing link.
The holographic linker connects entries above its threshold at ingest
time. ``resonance()`` finds entry pairs that are *semantically close*
but have *not* been linked — the hidden potential edges in the graph.
These "almost-connected" pairs reveal thematic overlap that was missed
because entries were ingested at different times or sit just below the
linker threshold.
Args:
threshold: Minimum similarity score to surface a pair (default 0.3).
Pairs already linked are excluded regardless of score.
limit: Maximum number of pairs to return (default 20).
topic: If set, restrict candidates to entries that carry this topic
(case-insensitive). Both entries in a pair must match.
Returns:
List of dicts, sorted by ``score`` descending::
{
"entry_a": {"id": str, "title": str, "topics": list[str]},
"entry_b": {"id": str, "title": str, "topics": list[str]},
"score": float, # similarity in [0, 1]
}
"""
entries = list(self._entries.values())
if topic:
topic_lower = topic.lower()
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
results: list[dict] = []
for i, entry_a in enumerate(entries):
for entry_b in entries[i + 1:]:
# Skip pairs that are already linked
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
continue
score = self.linker.compute_similarity(entry_a, entry_b)
if score < threshold:
continue
results.append({
"entry_a": {
"id": entry_a.id,
"title": entry_a.title,
"topics": entry_a.topics,
},
"entry_b": {
"id": entry_b.id,
"title": entry_b.title,
"topics": entry_b.topics,
},
"score": round(score, 4),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.

View File

@@ -6,7 +6,10 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff,
mnemosyne resonance,
mnemosyne ingest-dir
"""
from __future__ import annotations
@@ -17,7 +20,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
from nexus.mnemosyne.ingest import ingest_event, ingest_file, ingest_directory
def cmd_stats(args):
@@ -63,6 +66,21 @@ def cmd_ingest(args):
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_ingest_dir(args):
archive = MnemosyneArchive()
exts = set(args.extensions.split(",")) if args.extensions else None
stats = ingest_directory(
archive,
dir_path=args.path,
extensions=exts,
topics=args.topics.split(",") if args.topics else [],
)
print(f"Scanned: {stats['files_scanned']} files")
print(f"Ingested: {stats['files_ingested']} files -> {stats['entries_added']} entries")
print(f"Skipped: {stats['skipped']} files")
def cmd_link(args):
archive = MnemosyneArchive()
entry = archive.get(args.entry_id)
@@ -303,6 +321,86 @@ def cmd_fading(args):
print()
def cmd_snapshot(args):
archive = MnemosyneArchive()
if args.snapshot_cmd == "create":
result = archive.snapshot_create(label=args.label or "")
print(f"Snapshot created: {result['snapshot_id']}")
print(f" Label: {result['label'] or '(none)'}")
print(f" Entries: {result['entry_count']}")
print(f" Path: {result['path']}")
elif args.snapshot_cmd == "list":
snapshots = archive.snapshot_list()
if not snapshots:
print("No snapshots found.")
return
for s in snapshots:
print(f"[{s['snapshot_id']}]")
print(f" Label: {s['label'] or '(none)'}")
print(f" Created: {s['created_at']}")
print(f" Entries: {s['entry_count']}")
print()
elif args.snapshot_cmd == "restore":
try:
result = archive.snapshot_restore(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Restored from snapshot: {result['snapshot_id']}")
print(f" Entries restored: {result['restored_count']}")
print(f" Previous count: {result['previous_count']}")
elif args.snapshot_cmd == "diff":
try:
diff = archive.snapshot_diff(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Diff vs snapshot: {diff['snapshot_id']}")
print(f" Added ({len(diff['added'])}): ", end="")
if diff["added"]:
print()
for e in diff["added"]:
print(f" + [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Removed ({len(diff['removed'])}): ", end="")
if diff["removed"]:
print()
for e in diff["removed"]:
print(f" - [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Modified({len(diff['modified'])}): ", end="")
if diff["modified"]:
print()
for e in diff["modified"]:
print(f" ~ [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Unchanged: {diff['unchanged']}")
else:
print(f"Unknown snapshot subcommand: {args.snapshot_cmd}")
sys.exit(1)
def cmd_resonance(args):
archive = MnemosyneArchive()
topic = args.topic if args.topic else None
pairs = archive.resonance(threshold=args.threshold, limit=args.limit, topic=topic)
if not pairs:
print("No resonant pairs found.")
return
for p in pairs:
a = p["entry_a"]
b = p["entry_b"]
print(f"Score: {p['score']:.4f}")
print(f" [{a['id'][:8]}] {a['title']}")
print(f" Topics: {', '.join(a['topics']) if a['topics'] else '(none)'}")
print(f" [{b['id'][:8]}] {b['title']}")
print(f" Topics: {', '.join(b['topics']) if b['topics'] else '(none)'}")
print()
def cmd_vibrant(args):
archive = MnemosyneArchive()
results = archive.vibrant(limit=args.limit)
@@ -331,6 +429,12 @@ def main():
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
id = sub.add_parser("ingest-dir", help="Ingest all files from a directory")
id.add_argument("path", help="Directory path to ingest")
id.add_argument("--ext", dest="extensions", default="", help="Comma-separated extensions (default: .md,.txt)")
id.add_argument("--topics", default="", help="Comma-separated topics to tag all entries")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
l.add_argument("-d", "--depth", type=int, default=1)
@@ -401,10 +505,28 @@ def main():
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
rs = sub.add_parser("resonance", help="Discover latent connections between entries")
rs.add_argument("-t", "--threshold", type=float, default=0.3, help="Minimum similarity score (default: 0.3)")
rs.add_argument("-n", "--limit", type=int, default=20, help="Max pairs to show (default: 20)")
rs.add_argument("--topic", default="", help="Restrict to entries with this topic")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
sn_create.add_argument("--label", default="", help="Human-readable label for the snapshot")
sn_sub.add_parser("list", help="List available snapshots")
sn_restore = sn_sub.add_parser("restore", help="Restore archive from a snapshot")
sn_restore.add_argument("snapshot_id", help="Snapshot ID to restore")
sn_diff = sn_sub.add_parser("diff", help="Show what changed since a snapshot")
sn_diff.add_argument("snapshot_id", help="Snapshot ID to compare against")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "snapshot" and not args.snapshot_cmd:
sn.print_help()
sys.exit(1)
dispatch = {
"stats": cmd_stats,
@@ -430,6 +552,9 @@ def main():
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"resonance": cmd_resonance,
"snapshot": cmd_snapshot,
"ingest-dir": cmd_ingest_dir,
}
dispatch[args.command](args)

View File

@@ -1,15 +1,25 @@
"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, and manual entries.
Supports ingesting from MemPalace, raw events, files, and directories.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
# Default max chunk size in characters (roughly ~2000 tokens)
_DEFAULT_CHUNK_SIZE = 8000
# File extensions recognized for ingestion
_TEXT_EXTENSIONS = {".md", ".txt", ".rst", ".log", ".py", ".js", ".yaml", ".yml", ".json", ".toml", ".cfg", ".ini"}
_DEFAULT_EXTENSIONS = {".md", ".txt"}
def ingest_from_mempalace(
archive: MnemosyneArchive,
@@ -60,3 +70,179 @@ def ingest_event(
metadata=metadata or {},
)
return archive.add(entry)
def _extract_title(content: str, fallback: str = "Untitled") -> str:
"""Extract title from first markdown heading, or use fallback."""
for line in content.split("\n")[:10]:
line = line.strip()
m = re.match(r"^#{1,6}\s+(.+)$", line)
if m:
return m.group(1).strip()
for line in content.split("\n")[:5]:
line = line.strip()
if line and len(line) > 3:
return line[:120]
return fallback
def _chunk_content(content: str, max_size: int = _DEFAULT_CHUNK_SIZE) -> list[str]:
"""Split content into chunks at heading boundaries.
Splits on ## headings when content exceeds max_size.
Falls back to paragraph boundaries, then fixed-size splits.
"""
if len(content) <= max_size:
return [content]
chunks: list[str] = []
parts = re.split(r"(\n## )", content)
current = ""
for part in parts:
if len(current) + len(part) > max_size and current:
chunks.append(current.strip())
current = part
else:
current += part
if current.strip():
chunks.append(current.strip())
# If a single chunk is still too large, split on paragraphs
final_chunks: list[str] = []
for chunk in chunks:
if len(chunk) <= max_size:
final_chunks.append(chunk)
else:
paragraphs = chunk.split("\n\n")
para_current = ""
for para in paragraphs:
if len(para_current) + len(para) + 2 > max_size and para_current:
final_chunks.append(para_current.strip())
para_current = para
else:
para_current = para_current + "\n\n" + para if para_current else para
if para_current.strip():
final_chunks.append(para_current.strip())
return final_chunks if final_chunks else [content[:max_size]]
def ingest_file(
archive: MnemosyneArchive,
file_path,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> list:
"""Ingest a single file into the archive.
Extracts title from first markdown heading (or filename).
Large files are chunked at heading boundaries.
Re-ingesting the same unchanged file returns existing entries (dedup via source_ref).
Args:
archive: The MnemosyneArchive to ingest into.
file_path: Path to the file.
source: Source label (default "file").
topics: Topic tags to attach to entries.
max_chunk_size: Maximum characters per chunk before splitting.
Returns:
List of ArchiveEntry objects created (or existing if deduped).
Raises:
FileNotFoundError: If file_path does not exist.
UnicodeDecodeError: If file cannot be decoded as UTF-8.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
stat = path.stat()
source_ref = f"{path.resolve()}:{int(stat.st_mtime)}"
# Check if already ingested (same path + mtime)
existing = [e for e in archive._entries.values() if e.source_ref == source_ref]
if existing:
return existing
content = path.read_text(encoding="utf-8")
if not content.strip():
return []
title = _extract_title(content, fallback=path.stem)
chunks = _chunk_content(content, max_chunk_size)
entries: list = []
for i, chunk in enumerate(chunks):
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1}/{len(chunks)})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source=source,
source_ref=source_ref if len(chunks) == 1 else f"{source_ref}#chunk{i}",
topics=topics or [],
metadata={
"file_path": str(path.resolve()),
"file_name": path.name,
"file_size": stat.st_size,
"file_mtime": stat.st_mtime,
"chunk_index": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path,
extensions: Optional[set[str]] = None,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
recursive: bool = True,
) -> dict:
"""Ingest all matching files from a directory tree.
Args:
archive: The MnemosyneArchive to ingest into.
dir_path: Root directory to scan.
extensions: File extensions to include (default: .md, .txt).
source: Source label for ingested entries.
topics: Topic tags to attach to all entries.
max_chunk_size: Maximum characters per chunk before splitting.
recursive: Whether to recurse into subdirectories.
Returns:
Dict with keys: files_scanned, files_ingested, entries_added, skipped
"""
root = Path(dir_path)
if not root.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
exts = extensions or _DEFAULT_EXTENSIONS
stats = {"files_scanned": 0, "files_ingested": 0, "entries_added": 0, "skipped": 0}
pattern = "**/*" if recursive else "*"
for file_path in sorted(root.glob(pattern)):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
stats["files_scanned"] += 1
try:
entries = ingest_file(archive, file_path, source=source, topics=topics, max_chunk_size=max_chunk_size)
if entries:
stats["files_ingested"] += 1
stats["entries_added"] += len(entries)
else:
stats["skipped"] += 1
except (UnicodeDecodeError, OSError):
stats["skipped"] += 1
return stats

View File

@@ -0,0 +1,138 @@
"""Tests for MnemosyneArchive.resonance() — latent connection discovery."""
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _archive(tmp_path: Path) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=tmp_path / "archive.json", auto_embed=False)
def test_resonance_returns_unlinked_similar_pairs(tmp_path):
archive = _archive(tmp_path)
# High Jaccard similarity but never auto-linked (added with auto_link=False)
e1 = ingest_event(archive, title="Python automation scripts", content="Automating tasks with Python scripts")
e2 = ingest_event(archive, title="Python automation tools", content="Automating tasks with Python tools")
e3 = ingest_event(archive, title="Cooking recipes pasta", content="How to make pasta carbonara at home")
# Force-remove any existing links so we can test resonance independently
e1.links = []
e2.links = []
e3.links = []
archive._save()
pairs = archive.resonance(threshold=0.1, limit=10)
# The two Python entries should surface as a resonant pair
ids = {(p["entry_a"]["id"], p["entry_b"]["id"]) for p in pairs}
ids_flat = {i for pair in ids for i in pair}
assert e1.id in ids_flat and e2.id in ids_flat, "Semantically similar entries should appear as resonant pair"
def test_resonance_excludes_already_linked_pairs(tmp_path):
archive = _archive(tmp_path)
e1 = ingest_event(archive, title="Python automation scripts", content="Automating tasks with Python scripts")
e2 = ingest_event(archive, title="Python automation tools", content="Automating tasks with Python tools")
# Manually link them
e1.links = [e2.id]
e2.links = [e1.id]
archive._save()
pairs = archive.resonance(threshold=0.0, limit=100)
for p in pairs:
a_id = p["entry_a"]["id"]
b_id = p["entry_b"]["id"]
assert not (a_id == e1.id and b_id == e2.id), "Already-linked pair should be excluded"
assert not (a_id == e2.id and b_id == e1.id), "Already-linked pair should be excluded"
def test_resonance_sorted_by_score_descending(tmp_path):
archive = _archive(tmp_path)
ingest_event(archive, title="Python coding automation", content="Automating Python coding workflows")
ingest_event(archive, title="Python scripts automation", content="Automation via Python scripting")
ingest_event(archive, title="Cooking food at home", content="Home cooking and food preparation")
# Clear all links to test resonance
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=10)
scores = [p["score"] for p in pairs]
assert scores == sorted(scores, reverse=True), "Pairs must be sorted by score descending"
def test_resonance_limit_respected(tmp_path):
archive = _archive(tmp_path)
for i in range(10):
ingest_event(archive, title=f"Python entry {i}", content=f"Python automation entry number {i}")
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=3)
assert len(pairs) <= 3
def test_resonance_topic_filter(tmp_path):
archive = _archive(tmp_path)
e1 = ingest_event(archive, title="Python tools", content="Python automation tooling", topics=["python"])
e2 = ingest_event(archive, title="Python scripts", content="Python automation scripting", topics=["python"])
e3 = ingest_event(archive, title="Cooking pasta", content="Pasta carbonara recipe cooking", topics=["cooking"])
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=20, topic="python")
for p in pairs:
a_topics = [t.lower() for t in p["entry_a"]["topics"]]
b_topics = [t.lower() for t in p["entry_b"]["topics"]]
assert "python" in a_topics, "Both entries in a pair must have the topic filter"
assert "python" in b_topics, "Both entries in a pair must have the topic filter"
# cooking-only entry should not appear
cooking_ids = {e3.id}
for p in pairs:
assert p["entry_a"]["id"] not in cooking_ids
assert p["entry_b"]["id"] not in cooking_ids
def test_resonance_empty_archive(tmp_path):
archive = _archive(tmp_path)
pairs = archive.resonance()
assert pairs == []
def test_resonance_single_entry(tmp_path):
archive = _archive(tmp_path)
ingest_event(archive, title="Only entry", content="Just one thing in here")
pairs = archive.resonance()
assert pairs == []
def test_resonance_result_structure(tmp_path):
archive = _archive(tmp_path)
e1 = ingest_event(archive, title="Alpha topic one", content="Shared vocabulary alpha beta gamma")
e2 = ingest_event(archive, title="Alpha topic two", content="Shared vocabulary alpha beta delta")
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=5)
assert len(pairs) >= 1
pair = pairs[0]
assert "entry_a" in pair
assert "entry_b" in pair
assert "score" in pair
assert "id" in pair["entry_a"]
assert "title" in pair["entry_a"]
assert "topics" in pair["entry_a"]
assert isinstance(pair["score"], float)
assert 0.0 <= pair["score"] <= 1.0

View File

@@ -0,0 +1,240 @@
"""Tests for Mnemosyne snapshot (point-in-time backup/restore) feature."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive(tmp_dir: str) -> MnemosyneArchive:
path = Path(tmp_dir) / "archive.json"
return MnemosyneArchive(archive_path=path, auto_embed=False)
# ─── snapshot_create ─────────────────────────────────────────────────────────
def test_snapshot_create_returns_metadata():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Alpha", content="First entry", topics=["a"])
ingest_event(archive, title="Beta", content="Second entry", topics=["b"])
result = archive.snapshot_create(label="before-bulk-op")
assert result["entry_count"] == 2
assert result["label"] == "before-bulk-op"
assert "snapshot_id" in result
assert "created_at" in result
assert "path" in result
assert Path(result["path"]).exists()
def test_snapshot_create_no_label():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Gamma", content="Third entry", topics=[])
result = archive.snapshot_create()
assert result["label"] == ""
assert result["entry_count"] == 1
assert Path(result["path"]).exists()
def test_snapshot_file_contains_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Delta", content="Fourth entry", topics=["d"])
result = archive.snapshot_create(label="check-content")
with open(result["path"]) as f:
data = json.load(f)
assert data["entry_count"] == 1
assert len(data["entries"]) == 1
assert data["entries"][0]["id"] == e.id
assert data["entries"][0]["title"] == "Delta"
def test_snapshot_create_empty_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
result = archive.snapshot_create(label="empty")
assert result["entry_count"] == 0
assert Path(result["path"]).exists()
# ─── snapshot_list ───────────────────────────────────────────────────────────
def test_snapshot_list_empty():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
assert archive.snapshot_list() == []
def test_snapshot_list_returns_all():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="One", content="c1", topics=[])
archive.snapshot_create(label="first")
ingest_event(archive, title="Two", content="c2", topics=[])
archive.snapshot_create(label="second")
snapshots = archive.snapshot_list()
assert len(snapshots) == 2
labels = {s["label"] for s in snapshots}
assert "first" in labels
assert "second" in labels
def test_snapshot_list_metadata_fields():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="meta-check")
snapshots = archive.snapshot_list()
s = snapshots[0]
for key in ("snapshot_id", "label", "created_at", "entry_count", "path"):
assert key in s
def test_snapshot_list_newest_first():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="a")
archive.snapshot_create(label="b")
snapshots = archive.snapshot_list()
# Filenames sort lexicographically; newest (b) should be first
# (filenames include timestamp so alphabetical = newest-last;
# snapshot_list reverses the glob order → newest first)
assert len(snapshots) == 2
# Both should be present; ordering is newest first
ids = [s["snapshot_id"] for s in snapshots]
assert ids == sorted(ids, reverse=True)
# ─── snapshot_restore ────────────────────────────────────────────────────────
def test_snapshot_restore_replaces_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Kept", content="original content", topics=["orig"])
snap = archive.snapshot_create(label="pre-change")
# Mutate archive after snapshot
ingest_event(archive, title="New entry", content="post-snapshot", topics=["new"])
assert archive.count == 2
result = archive.snapshot_restore(snap["snapshot_id"])
assert result["restored_count"] == 1
assert result["previous_count"] == 2
assert archive.count == 1
entry = list(archive._entries.values())[0]
assert entry.title == "Kept"
def test_snapshot_restore_persists_to_disk():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = _make_archive(tmp)
ingest_event(archive, title="Persisted", content="should survive reload", topics=[])
snap = archive.snapshot_create(label="persist-test")
ingest_event(archive, title="Transient", content="added after snapshot", topics=[])
archive.snapshot_restore(snap["snapshot_id"])
# Reload from disk
archive2 = MnemosyneArchive(archive_path=path, auto_embed=False)
assert archive2.count == 1
assert list(archive2._entries.values())[0].title == "Persisted"
def test_snapshot_restore_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_restore("nonexistent_snapshot_id")
# ─── snapshot_diff ───────────────────────────────────────────────────────────
def test_snapshot_diff_no_changes():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Stable", content="unchanged content", topics=[])
snap = archive.snapshot_create(label="baseline")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["added"] == []
assert diff["removed"] == []
assert diff["modified"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_added():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Original", content="existing", topics=[])
snap = archive.snapshot_create(label="before-add")
ingest_event(archive, title="Newcomer", content="added after", topics=[])
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["added"]) == 1
assert diff["added"][0]["title"] == "Newcomer"
assert diff["removed"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_removed():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e1 = ingest_event(archive, title="Will Be Removed", content="doomed", topics=[])
ingest_event(archive, title="Survivor", content="stays", topics=[])
snap = archive.snapshot_create(label="pre-removal")
archive.remove(e1.id)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["removed"]) == 1
assert diff["removed"][0]["title"] == "Will Be Removed"
assert diff["added"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_modified():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Mutable", content="original content", topics=[])
snap = archive.snapshot_create(label="pre-edit")
archive.update_entry(e.id, content="updated content", auto_link=False)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["modified"]) == 1
assert diff["modified"][0]["title"] == "Mutable"
assert diff["modified"][0]["snapshot_hash"] != diff["modified"][0]["current_hash"]
assert diff["added"] == []
assert diff["removed"] == []
def test_snapshot_diff_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_diff("no_such_snapshot")
def test_snapshot_diff_includes_snapshot_id():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
snap = archive.snapshot_create(label="id-check")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["snapshot_id"] == snap["snapshot_id"]