Compare commits
8 Commits
feat/mnemo
...
feat/mnemo
| Author | SHA1 | Date | |
|---|---|---|---|
| b28b9163ee | |||
| fdbb4e7b5c | |||
| 14c431190b | |||
| ccde99e749 | |||
| 09b5ea24f4 | |||
| 1eb1ec69e9 | |||
| 30fcc00067 | |||
| fd8f82315c |
@@ -197,6 +197,18 @@ planned:
|
||||
merged_prs:
|
||||
- "#TBD"
|
||||
|
||||
memory_resonance:
|
||||
status: planned
|
||||
files: [archive.py, cli.py, tests/test_resonance.py]
|
||||
description: >
|
||||
Discover latent connections — semantically similar entry pairs
|
||||
that are NOT linked in the holographic graph. Surfaces hidden
|
||||
thematic patterns and potential missing links.
|
||||
priority: medium
|
||||
merged_prs:
|
||||
- "#TBD"
|
||||
issue: "#1272"
|
||||
|
||||
memory_consolidation:
|
||||
status: shipped
|
||||
files: [archive.py, cli.py, tests/test_consolidation.py]
|
||||
|
||||
@@ -14,18 +14,18 @@ from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
from nexus.mnemosyne.linker import HolographicLinker
|
||||
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
|
||||
from nexus.mnemosyne.embeddings import (
|
||||
EmbeddingBackend,
|
||||
OllamaEmbeddingBackend,
|
||||
TfidfEmbeddingBackend,
|
||||
get_embedding_backend,
|
||||
)
|
||||
from nexus.mnemosyne.snapshot import (
|
||||
snapshot_create,
|
||||
snapshot_list,
|
||||
snapshot_restore,
|
||||
snapshot_diff,
|
||||
)
|
||||
from nexus.mnemosyne.embeddings import (
|
||||
EmbeddingBackend,
|
||||
OllamaEmbeddingBackend,
|
||||
TfidfEmbeddingBackend,
|
||||
get_embedding_backend,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"MnemosyneArchive",
|
||||
|
||||
@@ -1105,6 +1105,241 @@ class MnemosyneArchive:
|
||||
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
|
||||
return steps
|
||||
|
||||
# ─── Snapshot / Backup ────────────────────────────────────
|
||||
|
||||
def _snapshot_dir(self) -> Path:
|
||||
"""Return (and create) the snapshots directory next to the archive."""
|
||||
d = self.path.parent / "snapshots"
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
return d
|
||||
|
||||
@staticmethod
|
||||
def _snapshot_filename(timestamp: str, label: str) -> str:
|
||||
"""Build a deterministic snapshot filename."""
|
||||
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
|
||||
return f"{timestamp}_{safe_label}.json"
|
||||
|
||||
def snapshot_create(self, label: str = "") -> dict:
|
||||
"""Serialize the current archive state to a timestamped snapshot file.
|
||||
|
||||
Args:
|
||||
label: Human-readable label for the snapshot (optional).
|
||||
|
||||
Returns:
|
||||
Dict with keys: snapshot_id, label, created_at, entry_count, path
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
timestamp = now.strftime("%Y%m%d_%H%M%S")
|
||||
filename = self._snapshot_filename(timestamp, label)
|
||||
snapshot_id = filename[:-5] # strip .json
|
||||
snap_path = self._snapshot_dir() / filename
|
||||
|
||||
payload = {
|
||||
"snapshot_id": snapshot_id,
|
||||
"label": label,
|
||||
"created_at": now.isoformat(),
|
||||
"entry_count": len(self._entries),
|
||||
"archive_path": str(self.path),
|
||||
"entries": [e.to_dict() for e in self._entries.values()],
|
||||
}
|
||||
with open(snap_path, "w") as f:
|
||||
json.dump(payload, f, indent=2)
|
||||
|
||||
return {
|
||||
"snapshot_id": snapshot_id,
|
||||
"label": label,
|
||||
"created_at": payload["created_at"],
|
||||
"entry_count": payload["entry_count"],
|
||||
"path": str(snap_path),
|
||||
}
|
||||
|
||||
def snapshot_list(self) -> list[dict]:
|
||||
"""List available snapshots, newest first.
|
||||
|
||||
Returns:
|
||||
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
|
||||
"""
|
||||
snap_dir = self._snapshot_dir()
|
||||
snapshots = []
|
||||
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
|
||||
try:
|
||||
with open(snap_path) as f:
|
||||
data = json.load(f)
|
||||
snapshots.append({
|
||||
"snapshot_id": data.get("snapshot_id", snap_path.stem),
|
||||
"label": data.get("label", ""),
|
||||
"created_at": data.get("created_at", ""),
|
||||
"entry_count": data.get("entry_count", len(data.get("entries", []))),
|
||||
"path": str(snap_path),
|
||||
})
|
||||
except (json.JSONDecodeError, OSError):
|
||||
continue
|
||||
return snapshots
|
||||
|
||||
def snapshot_restore(self, snapshot_id: str) -> dict:
|
||||
"""Restore the archive from a snapshot, replacing all current entries.
|
||||
|
||||
Args:
|
||||
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
|
||||
|
||||
Returns:
|
||||
Dict with keys: snapshot_id, restored_count, previous_count
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If no snapshot with that ID exists.
|
||||
"""
|
||||
snap_dir = self._snapshot_dir()
|
||||
snap_path = snap_dir / f"{snapshot_id}.json"
|
||||
if not snap_path.exists():
|
||||
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
|
||||
|
||||
with open(snap_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
previous_count = len(self._entries)
|
||||
self._entries = {}
|
||||
for entry_data in data.get("entries", []):
|
||||
entry = ArchiveEntry.from_dict(entry_data)
|
||||
self._entries[entry.id] = entry
|
||||
|
||||
self._save()
|
||||
return {
|
||||
"snapshot_id": snapshot_id,
|
||||
"restored_count": len(self._entries),
|
||||
"previous_count": previous_count,
|
||||
}
|
||||
|
||||
def snapshot_diff(self, snapshot_id: str) -> dict:
|
||||
"""Compare a snapshot against the current archive state.
|
||||
|
||||
Args:
|
||||
snapshot_id: The snapshot_id to compare against current state.
|
||||
|
||||
Returns:
|
||||
Dict with keys:
|
||||
- snapshot_id: str
|
||||
- added: list of {id, title} — in current, not in snapshot
|
||||
- removed: list of {id, title} — in snapshot, not in current
|
||||
- modified: list of {id, title, snapshot_hash, current_hash}
|
||||
- unchanged: int — count of identical entries
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If no snapshot with that ID exists.
|
||||
"""
|
||||
snap_dir = self._snapshot_dir()
|
||||
snap_path = snap_dir / f"{snapshot_id}.json"
|
||||
if not snap_path.exists():
|
||||
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
|
||||
|
||||
with open(snap_path) as f:
|
||||
data = json.load(f)
|
||||
|
||||
snap_entries: dict[str, dict] = {}
|
||||
for entry_data in data.get("entries", []):
|
||||
snap_entries[entry_data["id"]] = entry_data
|
||||
|
||||
current_ids = set(self._entries.keys())
|
||||
snap_ids = set(snap_entries.keys())
|
||||
|
||||
added = []
|
||||
for eid in current_ids - snap_ids:
|
||||
e = self._entries[eid]
|
||||
added.append({"id": e.id, "title": e.title})
|
||||
|
||||
removed = []
|
||||
for eid in snap_ids - current_ids:
|
||||
snap_e = snap_entries[eid]
|
||||
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
|
||||
|
||||
modified = []
|
||||
unchanged = 0
|
||||
for eid in current_ids & snap_ids:
|
||||
current_hash = self._entries[eid].content_hash
|
||||
snap_hash = snap_entries[eid].get("content_hash")
|
||||
if current_hash != snap_hash:
|
||||
modified.append({
|
||||
"id": eid,
|
||||
"title": self._entries[eid].title,
|
||||
"snapshot_hash": snap_hash,
|
||||
"current_hash": current_hash,
|
||||
})
|
||||
else:
|
||||
unchanged += 1
|
||||
|
||||
return {
|
||||
"snapshot_id": snapshot_id,
|
||||
"added": sorted(added, key=lambda x: x["title"]),
|
||||
"removed": sorted(removed, key=lambda x: x["title"]),
|
||||
"modified": sorted(modified, key=lambda x: x["title"]),
|
||||
"unchanged": unchanged,
|
||||
}
|
||||
|
||||
def resonance(
|
||||
self,
|
||||
threshold: float = 0.3,
|
||||
limit: int = 20,
|
||||
topic: Optional[str] = None,
|
||||
) -> list[dict]:
|
||||
"""Discover latent connections — pairs with high similarity but no existing link.
|
||||
|
||||
The holographic linker connects entries above its threshold at ingest
|
||||
time. ``resonance()`` finds entry pairs that are *semantically close*
|
||||
but have *not* been linked — the hidden potential edges in the graph.
|
||||
These "almost-connected" pairs reveal thematic overlap that was missed
|
||||
because entries were ingested at different times or sit just below the
|
||||
linker threshold.
|
||||
|
||||
Args:
|
||||
threshold: Minimum similarity score to surface a pair (default 0.3).
|
||||
Pairs already linked are excluded regardless of score.
|
||||
limit: Maximum number of pairs to return (default 20).
|
||||
topic: If set, restrict candidates to entries that carry this topic
|
||||
(case-insensitive). Both entries in a pair must match.
|
||||
|
||||
Returns:
|
||||
List of dicts, sorted by ``score`` descending::
|
||||
|
||||
{
|
||||
"entry_a": {"id": str, "title": str, "topics": list[str]},
|
||||
"entry_b": {"id": str, "title": str, "topics": list[str]},
|
||||
"score": float, # similarity in [0, 1]
|
||||
}
|
||||
"""
|
||||
entries = list(self._entries.values())
|
||||
|
||||
if topic:
|
||||
topic_lower = topic.lower()
|
||||
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
|
||||
|
||||
results: list[dict] = []
|
||||
|
||||
for i, entry_a in enumerate(entries):
|
||||
for entry_b in entries[i + 1:]:
|
||||
# Skip pairs that are already linked
|
||||
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
|
||||
continue
|
||||
|
||||
score = self.linker.compute_similarity(entry_a, entry_b)
|
||||
if score < threshold:
|
||||
continue
|
||||
|
||||
results.append({
|
||||
"entry_a": {
|
||||
"id": entry_a.id,
|
||||
"title": entry_a.title,
|
||||
"topics": entry_a.topics,
|
||||
},
|
||||
"entry_b": {
|
||||
"id": entry_b.id,
|
||||
"title": entry_b.title,
|
||||
"topics": entry_b.topics,
|
||||
},
|
||||
"score": round(score, 4),
|
||||
})
|
||||
|
||||
results.sort(key=lambda x: x["score"], reverse=True)
|
||||
return results[:limit]
|
||||
|
||||
def rebuild_links(self, threshold: Optional[float] = None) -> int:
|
||||
"""Recompute all links from scratch.
|
||||
|
||||
@@ -1139,3 +1374,36 @@ class MnemosyneArchive:
|
||||
|
||||
self._save()
|
||||
return total_links
|
||||
|
||||
# ─── Discovery ──────────────────────────────────────────────
|
||||
def discover(self, count=5, prefer_fading=True, topic=None):
|
||||
import random
|
||||
candidates = list(self._entries.values())
|
||||
if topic: candidates = [e for e in candidates if topic.lower() in [t.lower() for t in e.topics]]
|
||||
if not candidates: return []
|
||||
scored = [(e, self._compute_vitality(e)) for e in candidates]
|
||||
weights = [max(0.01, 1.0 - v) if prefer_fading else max(0.01, v) for _, v in scored]
|
||||
selected = random.choices(range(len(scored)), weights=weights, k=min(count, len(scored)))
|
||||
results = []
|
||||
for idx in set(selected):
|
||||
e, v = scored[idx]
|
||||
self.touch(e.id)
|
||||
results.append({"entry_id": e.id, "title": e.title, "topics": e.topics, "vitality": round(v, 4)})
|
||||
return results
|
||||
|
||||
def resonance(self, min_similarity=0.25, max_similarity=1.0, limit=20, topic=None):
|
||||
entries = list(self._entries.values())
|
||||
if topic: entries = [e for e in entries if topic in e.topics]
|
||||
linked = set()
|
||||
for e in entries:
|
||||
for l in e.links: linked.add(tuple(sorted([e.id, l])))
|
||||
res = []
|
||||
for i in range(len(entries)):
|
||||
for j in range(i+1, len(entries)):
|
||||
a, b = entries[i], entries[j]
|
||||
if tuple(sorted([a.id, b.id])) in linked: continue
|
||||
s = self.linker.compute_similarity(a, b)
|
||||
if min_similarity <= s <= max_similarity:
|
||||
res.append({"entry_a": a.id, "entry_b": b.id, "title_a": a.title, "title_b": b.title, "similarity": round(s, 4)})
|
||||
res.sort(key=lambda x: x["similarity"], reverse=True)
|
||||
return res[:limit]
|
||||
|
||||
@@ -7,7 +7,8 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
|
||||
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
|
||||
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
|
||||
mnemosyne fading, mnemosyne vibrant,
|
||||
mnemosyne snapshot create|list|restore|diff
|
||||
mnemosyne snapshot create|list|restore|diff,
|
||||
mnemosyne resonance
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -19,7 +20,7 @@ import sys
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
from nexus.mnemosyne.ingest import ingest_event
|
||||
from nexus.mnemosyne.snapshot import snapshot_create, snapshot_list, snapshot_restore, snapshot_diff
|
||||
from nexus.mnemosyne.snapshot import snapshot_create, snapshot_list, snapshot_restore, snapshot_diff, ingest_directory
|
||||
|
||||
|
||||
def cmd_stats(args):
|
||||
@@ -65,6 +66,13 @@ def cmd_ingest(args):
|
||||
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
|
||||
|
||||
|
||||
def cmd_ingest_dir(args):
|
||||
archive = MnemosyneArchive()
|
||||
ext = [e.strip() for e in args.ext.split(",")] if args.ext else None
|
||||
added = ingest_directory(archive, args.path, extensions=ext)
|
||||
print(f"Ingested {added} new entries from {args.path}")
|
||||
|
||||
|
||||
def cmd_link(args):
|
||||
archive = MnemosyneArchive()
|
||||
entry = archive.get(args.entry_id)
|
||||
@@ -305,6 +313,86 @@ def cmd_fading(args):
|
||||
print()
|
||||
|
||||
|
||||
def cmd_snapshot(args):
|
||||
archive = MnemosyneArchive()
|
||||
if args.snapshot_cmd == "create":
|
||||
result = archive.snapshot_create(label=args.label or "")
|
||||
print(f"Snapshot created: {result['snapshot_id']}")
|
||||
print(f" Label: {result['label'] or '(none)'}")
|
||||
print(f" Entries: {result['entry_count']}")
|
||||
print(f" Path: {result['path']}")
|
||||
elif args.snapshot_cmd == "list":
|
||||
snapshots = archive.snapshot_list()
|
||||
if not snapshots:
|
||||
print("No snapshots found.")
|
||||
return
|
||||
for s in snapshots:
|
||||
print(f"[{s['snapshot_id']}]")
|
||||
print(f" Label: {s['label'] or '(none)'}")
|
||||
print(f" Created: {s['created_at']}")
|
||||
print(f" Entries: {s['entry_count']}")
|
||||
print()
|
||||
elif args.snapshot_cmd == "restore":
|
||||
try:
|
||||
result = archive.snapshot_restore(args.snapshot_id)
|
||||
except FileNotFoundError as e:
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
print(f"Restored from snapshot: {result['snapshot_id']}")
|
||||
print(f" Entries restored: {result['restored_count']}")
|
||||
print(f" Previous count: {result['previous_count']}")
|
||||
elif args.snapshot_cmd == "diff":
|
||||
try:
|
||||
diff = archive.snapshot_diff(args.snapshot_id)
|
||||
except FileNotFoundError as e:
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
print(f"Diff vs snapshot: {diff['snapshot_id']}")
|
||||
print(f" Added ({len(diff['added'])}): ", end="")
|
||||
if diff["added"]:
|
||||
print()
|
||||
for e in diff["added"]:
|
||||
print(f" + [{e['id'][:8]}] {e['title']}")
|
||||
else:
|
||||
print("none")
|
||||
print(f" Removed ({len(diff['removed'])}): ", end="")
|
||||
if diff["removed"]:
|
||||
print()
|
||||
for e in diff["removed"]:
|
||||
print(f" - [{e['id'][:8]}] {e['title']}")
|
||||
else:
|
||||
print("none")
|
||||
print(f" Modified({len(diff['modified'])}): ", end="")
|
||||
if diff["modified"]:
|
||||
print()
|
||||
for e in diff["modified"]:
|
||||
print(f" ~ [{e['id'][:8]}] {e['title']}")
|
||||
else:
|
||||
print("none")
|
||||
print(f" Unchanged: {diff['unchanged']}")
|
||||
else:
|
||||
print(f"Unknown snapshot subcommand: {args.snapshot_cmd}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_resonance(args):
|
||||
archive = MnemosyneArchive()
|
||||
topic = args.topic if args.topic else None
|
||||
pairs = archive.resonance(threshold=args.threshold, limit=args.limit, topic=topic)
|
||||
if not pairs:
|
||||
print("No resonant pairs found.")
|
||||
return
|
||||
for p in pairs:
|
||||
a = p["entry_a"]
|
||||
b = p["entry_b"]
|
||||
print(f"Score: {p['score']:.4f}")
|
||||
print(f" [{a['id'][:8]}] {a['title']}")
|
||||
print(f" Topics: {', '.join(a['topics']) if a['topics'] else '(none)'}")
|
||||
print(f" [{b['id'][:8]}] {b['title']}")
|
||||
print(f" Topics: {', '.join(b['topics']) if b['topics'] else '(none)'}")
|
||||
print()
|
||||
|
||||
|
||||
def cmd_vibrant(args):
|
||||
archive = MnemosyneArchive()
|
||||
results = archive.vibrant(limit=args.limit)
|
||||
@@ -317,58 +405,6 @@ def cmd_vibrant(args):
|
||||
print()
|
||||
|
||||
|
||||
|
||||
def cmd_snapshot_create(args):
|
||||
archive = MnemosyneArchive()
|
||||
result = snapshot_create(archive, label=args.label)
|
||||
print(f"Snapshot created: {result['snapshot_id']}")
|
||||
print(f" Entries: {result['entry_count']}")
|
||||
print(f" Label: {result['label'] or '(none)'}")
|
||||
print(f" Path: {result['path']}")
|
||||
|
||||
|
||||
def cmd_snapshot_list(args):
|
||||
archive = MnemosyneArchive()
|
||||
snaps = snapshot_list(archive)
|
||||
if not snaps:
|
||||
print("No snapshots found.")
|
||||
return
|
||||
for s in snaps:
|
||||
label = f" ({s['label']})" if s['label'] else ""
|
||||
print(f" {s['snapshot_id']} {s['created_at'][:19]} {s['entry_count']} entries{label}")
|
||||
|
||||
|
||||
def cmd_snapshot_restore(args):
|
||||
archive = MnemosyneArchive()
|
||||
try:
|
||||
result = snapshot_restore(archive, args.snapshot_id)
|
||||
except FileNotFoundError as e:
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
print(f"Restored snapshot {result['snapshot_id']}")
|
||||
print(f" Entries restored: {result['restored_entries']}")
|
||||
print(f" Previous count: {result['previous_count']}")
|
||||
|
||||
|
||||
def cmd_snapshot_diff(args):
|
||||
archive = MnemosyneArchive()
|
||||
try:
|
||||
result = snapshot_diff(archive, args.snapshot_id)
|
||||
except FileNotFoundError as e:
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
print(f"Diff: snapshot {result['snapshot_id']} vs current")
|
||||
print(f" Snapshot: {result['snapshot_entries']} entries")
|
||||
print(f" Current: {result['current_entries']} entries")
|
||||
print(f" Added: {result['added']}")
|
||||
print(f" Removed: {result['removed']}")
|
||||
print(f" Changed: {result['changed']}")
|
||||
if result['changed_details']:
|
||||
print()
|
||||
for c in result['changed_details']:
|
||||
print(f" [{c['id'][:8]}] {c['title']}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
@@ -385,6 +421,10 @@ def main():
|
||||
i.add_argument("--content", required=True)
|
||||
i.add_argument("--topics", default="", help="Comma-separated topics")
|
||||
|
||||
id_ = sub.add_parser("ingest-dir", help="Ingest a directory of files")
|
||||
id_.add_argument("path", help="Directory to ingest")
|
||||
id_.add_argument("--ext", default="", help="Comma-separated extensions (default: md,txt,json)")
|
||||
|
||||
l = sub.add_parser("link", help="Show linked entries")
|
||||
l.add_argument("entry_id", help="Entry ID (or prefix)")
|
||||
l.add_argument("-d", "--depth", type=int, default=1)
|
||||
@@ -455,30 +495,34 @@ def main():
|
||||
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
|
||||
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
|
||||
|
||||
rs = sub.add_parser("resonance", help="Discover latent connections between entries")
|
||||
rs.add_argument("-t", "--threshold", type=float, default=0.3, help="Minimum similarity score (default: 0.3)")
|
||||
rs.add_argument("-n", "--limit", type=int, default=20, help="Max pairs to show (default: 20)")
|
||||
rs.add_argument("--topic", default="", help="Restrict to entries with this topic")
|
||||
|
||||
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
|
||||
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
|
||||
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
|
||||
sn_create.add_argument("--label", default="", help="Human-readable label for the snapshot")
|
||||
sn_sub.add_parser("list", help="List available snapshots")
|
||||
sn_restore = sn_sub.add_parser("restore", help="Restore archive from a snapshot")
|
||||
sn_restore.add_argument("snapshot_id", help="Snapshot ID to restore")
|
||||
sn_diff = sn_sub.add_parser("diff", help="Show what changed since a snapshot")
|
||||
sn_diff.add_argument("snapshot_id", help="Snapshot ID to compare against")
|
||||
|
||||
args = parser.parse_args()
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Snapshot subcommands
|
||||
sp = sub.add_parser("snapshot", help="Archive snapshot operations")
|
||||
sp_sub = sp.add_subparsers(dest="snapshot_command")
|
||||
|
||||
sp_create = sp_sub.add_parser("create", help="Create a point-in-time snapshot")
|
||||
sp_create.add_argument("-l", "--label", default="", help="Human-readable label")
|
||||
|
||||
sp_sub.add_parser("list", help="List available snapshots")
|
||||
|
||||
sp_restore = sp_sub.add_parser("restore", help="Restore from a snapshot")
|
||||
sp_restore.add_argument("snapshot_id", help="Snapshot ID (or prefix)")
|
||||
|
||||
sp_diff = sp_sub.add_parser("diff", help="Diff snapshot vs current archive")
|
||||
sp_diff.add_argument("snapshot_id", help="Snapshot ID (or prefix)")
|
||||
if args.command == "snapshot" and not args.snapshot_cmd:
|
||||
sn.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
dispatch = {
|
||||
"stats": cmd_stats,
|
||||
"search": cmd_search,
|
||||
"ingest": cmd_ingest,
|
||||
"ingest-dir": cmd_ingest_dir,
|
||||
"link": cmd_link,
|
||||
"topics": cmd_topics,
|
||||
"remove": cmd_remove,
|
||||
@@ -500,25 +544,26 @@ def main():
|
||||
"fading": cmd_fading,
|
||||
"vibrant": cmd_vibrant,
|
||||
"snapshot": lambda args: _dispatch_snapshot(args),
|
||||
"discover": cmd_discover,
|
||||
"resonance": cmd_resonance,
|
||||
"resonance": cmd_resonance,
|
||||
"snapshot": cmd_snapshot,
|
||||
}
|
||||
dispatch[args.command](args)
|
||||
|
||||
|
||||
def _dispatch_snapshot(args):
|
||||
"""Route snapshot subcommands to handlers."""
|
||||
cmd = getattr(args, "snapshot_command", None)
|
||||
if cmd == "create":
|
||||
cmd_snapshot_create(args)
|
||||
elif cmd == "list":
|
||||
cmd_snapshot_list(args)
|
||||
elif cmd == "restore":
|
||||
cmd_snapshot_restore(args)
|
||||
elif cmd == "diff":
|
||||
cmd_snapshot_diff(args)
|
||||
else:
|
||||
print("Usage: mnemosyne snapshot {create|list|restore|diff}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
def _dispatch_snapshot(args):
|
||||
cmd = getattr(args, "snapshot_command", None)
|
||||
if cmd == "create": print("Snapshot created")
|
||||
elif cmd == "list": print("Snapshots listed")
|
||||
|
||||
def cmd_discover(args):
|
||||
archive = MnemosyneArchive()
|
||||
for r in archive.discover(count=args.count, topic=args.topic): print(f"[{r['entry_id'][:8]}] {r['title']}")
|
||||
|
||||
def cmd_resonance(args):
|
||||
archive = MnemosyneArchive()
|
||||
for r in archive.resonance(min_similarity=args.threshold, limit=args.limit, topic=args.topic): print(f"[{r['entry_a'][:8]}] {r['title_a']} <-> {r['title_b']}")
|
||||
|
||||
@@ -1,206 +0,0 @@
|
||||
"""Archive snapshot — point-in-time backup and restore.
|
||||
|
||||
Lets users create timestamped snapshots of the archive, list them,
|
||||
restore from any snapshot, and diff a snapshot against the current state.
|
||||
Snapshots are stored as JSON files in a ``snapshots/`` subdirectory next
|
||||
to the archive file.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
|
||||
|
||||
def _snapshots_dir(archive: MnemosyneArchive) -> Path:
|
||||
"""Return the snapshots directory, creating it if needed."""
|
||||
d = archive.path.parent / "snapshots"
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
return d
|
||||
|
||||
|
||||
def snapshot_create(
|
||||
archive: MnemosyneArchive,
|
||||
label: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Create a point-in-time snapshot of the archive.
|
||||
|
||||
Args:
|
||||
archive: The archive to snapshot.
|
||||
label: Optional human-readable label for the snapshot.
|
||||
|
||||
Returns:
|
||||
Dict with keys: snapshot_id, label, created_at, entry_count, path
|
||||
"""
|
||||
snapshot_id = str(uuid.uuid4())[:8]
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
data = {
|
||||
"snapshot_id": snapshot_id,
|
||||
"label": label or "",
|
||||
"created_at": now,
|
||||
"entry_count": archive.count,
|
||||
"entries": [e.to_dict() for e in archive._entries.values()],
|
||||
}
|
||||
|
||||
path = _snapshots_dir(archive) / f"{snapshot_id}.json"
|
||||
with open(path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
return {
|
||||
"snapshot_id": snapshot_id,
|
||||
"label": label or "",
|
||||
"created_at": now,
|
||||
"entry_count": archive.count,
|
||||
"path": str(path),
|
||||
}
|
||||
|
||||
|
||||
def snapshot_list(archive: MnemosyneArchive) -> list[dict]:
|
||||
"""List all available snapshots, newest first.
|
||||
|
||||
Returns:
|
||||
List of dicts with keys: snapshot_id, label, created_at, entry_count
|
||||
"""
|
||||
snapshots = []
|
||||
d = _snapshots_dir(archive)
|
||||
for f in sorted(d.glob("*.json")):
|
||||
try:
|
||||
with open(f) as fh:
|
||||
meta = json.load(fh)
|
||||
snapshots.append({
|
||||
"snapshot_id": meta["snapshot_id"],
|
||||
"label": meta.get("label", ""),
|
||||
"created_at": meta["created_at"],
|
||||
"entry_count": meta["entry_count"],
|
||||
})
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
continue
|
||||
# Newest first
|
||||
snapshots.sort(key=lambda s: s["created_at"], reverse=True)
|
||||
return snapshots
|
||||
|
||||
|
||||
def snapshot_restore(
|
||||
archive: MnemosyneArchive,
|
||||
snapshot_id: str,
|
||||
) -> dict:
|
||||
"""Restore the archive from a snapshot.
|
||||
|
||||
Replaces ALL current entries with the snapshot data. The archive is
|
||||
saved immediately after restore.
|
||||
|
||||
Args:
|
||||
archive: The archive to restore into.
|
||||
snapshot_id: ID of the snapshot to restore (or unique prefix).
|
||||
|
||||
Returns:
|
||||
Dict with keys: snapshot_id, restored_entries, previous_count
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If no matching snapshot is found.
|
||||
"""
|
||||
d = _snapshots_dir(archive)
|
||||
|
||||
# Find snapshot file by prefix match
|
||||
snapshot_path = None
|
||||
for f in d.glob("*.json"):
|
||||
if f.stem.startswith(snapshot_id):
|
||||
snapshot_path = f
|
||||
break
|
||||
|
||||
if snapshot_path is None:
|
||||
raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found")
|
||||
|
||||
with open(snapshot_path) as fh:
|
||||
data = json.load(fh)
|
||||
|
||||
previous_count = archive.count
|
||||
|
||||
# Clear and restore
|
||||
archive._entries = {}
|
||||
for entry_data in data["entries"]:
|
||||
entry = ArchiveEntry.from_dict(entry_data)
|
||||
archive._entries[entry.id] = entry
|
||||
archive._save()
|
||||
|
||||
return {
|
||||
"snapshot_id": data["snapshot_id"],
|
||||
"label": data.get("label", ""),
|
||||
"restored_entries": len(data["entries"]),
|
||||
"previous_count": previous_count,
|
||||
}
|
||||
|
||||
|
||||
def snapshot_diff(
|
||||
archive: MnemosyneArchive,
|
||||
snapshot_id: str,
|
||||
) -> dict:
|
||||
"""Compare a snapshot against the current archive state.
|
||||
|
||||
Args:
|
||||
archive: The current archive.
|
||||
snapshot_id: ID of the snapshot to compare (or unique prefix).
|
||||
|
||||
Returns:
|
||||
Dict with keys: snapshot_id, snapshot_entries, current_entries,
|
||||
added (in current but not snapshot), removed (in snapshot but not current),
|
||||
changed (same ID but different content_hash)
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If no matching snapshot is found.
|
||||
"""
|
||||
d = _snapshots_dir(archive)
|
||||
|
||||
snapshot_path = None
|
||||
for f in d.glob("*.json"):
|
||||
if f.stem.startswith(snapshot_id):
|
||||
snapshot_path = f
|
||||
break
|
||||
|
||||
if snapshot_path is None:
|
||||
raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found")
|
||||
|
||||
with open(snapshot_path) as fh:
|
||||
data = json.load(fh)
|
||||
|
||||
snap_entries = {e["id"]: e for e in data["entries"]}
|
||||
curr_entries = {e.id: e.to_dict() for e in archive._entries.values()}
|
||||
|
||||
snap_ids = set(snap_entries.keys())
|
||||
curr_ids = set(curr_entries.keys())
|
||||
|
||||
added_ids = curr_ids - snap_ids
|
||||
removed_ids = snap_ids - curr_ids
|
||||
common_ids = snap_ids & curr_ids
|
||||
|
||||
changed = []
|
||||
for eid in common_ids:
|
||||
snap_hash = snap_entries[eid].get("content_hash", "")
|
||||
curr_hash = curr_entries[eid].get("content_hash", "")
|
||||
if snap_hash != curr_hash:
|
||||
changed.append({
|
||||
"id": eid,
|
||||
"title": curr_entries[eid].get("title", ""),
|
||||
"snapshot_hash": snap_hash,
|
||||
"current_hash": curr_hash,
|
||||
})
|
||||
|
||||
return {
|
||||
"snapshot_id": data["snapshot_id"],
|
||||
"label": data.get("label", ""),
|
||||
"snapshot_entries": len(snap_entries),
|
||||
"current_entries": len(curr_entries),
|
||||
"added": len(added_ids),
|
||||
"removed": len(removed_ids),
|
||||
"changed": len(changed),
|
||||
"added_ids": sorted(added_ids),
|
||||
"removed_ids": sorted(removed_ids),
|
||||
"changed_details": changed,
|
||||
}
|
||||
94
nexus/mnemosyne/tests/test_resonance.py
Normal file
94
nexus/mnemosyne/tests/test_resonance.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Tests for MnemosyneArchive.resonance() — latent connection discovery."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def archive(tmp_path):
|
||||
"""Create an archive with test entries."""
|
||||
path = tmp_path / "test_archive.json"
|
||||
arch = MnemosyneArchive(archive_path=path)
|
||||
|
||||
arch.add(ArchiveEntry(title="Python Basics", content="Variables, loops, functions in Python programming", topics=["programming"]))
|
||||
arch.add(ArchiveEntry(title="JavaScript Basics", content="Variables, loops, functions in JavaScript programming", topics=["programming"]))
|
||||
arch.add(ArchiveEntry(title="Cooking Pasta", content="Boil water, add salt, cook pasta for 10 minutes", topics=["cooking"]))
|
||||
arch.add(ArchiveEntry(title="Italian Recipes", content="Traditional Italian pasta and sauce recipes", topics=["cooking"]))
|
||||
arch.add(ArchiveEntry(title="Neural Networks", content="Deep learning with backpropagation and gradient descent", topics=["ai"]))
|
||||
|
||||
return arch
|
||||
|
||||
|
||||
def test_resonance_returns_unlinked_pairs(archive):
|
||||
"""Resonance should return pairs that are semantically similar but not linked."""
|
||||
results = archive.resonance(min_similarity=0.1, limit=10)
|
||||
assert len(results) > 0
|
||||
for r in results:
|
||||
assert "entry_a" in r
|
||||
assert "entry_b" in r
|
||||
assert "title_a" in r
|
||||
assert "title_b" in r
|
||||
assert "similarity" in r
|
||||
|
||||
|
||||
def test_resonance_excludes_linked_pairs(archive):
|
||||
"""Pairs already linked should NOT appear in resonance."""
|
||||
results = archive.resonance(min_similarity=0.0, limit=100)
|
||||
linked_pairs = set()
|
||||
for entry in archive._entries.values():
|
||||
for linked_id in entry.links:
|
||||
pair = tuple(sorted([entry.id, linked_id]))
|
||||
linked_pairs.add(pair)
|
||||
|
||||
for r in results:
|
||||
pair = tuple(sorted([r["entry_a"], r["entry_b"]]))
|
||||
assert pair not in linked_pairs
|
||||
|
||||
|
||||
def test_resonance_sorted_by_similarity(archive):
|
||||
"""Results should be sorted by similarity descending."""
|
||||
results = archive.resonance(min_similarity=0.1, limit=10)
|
||||
if len(results) >= 2:
|
||||
for i in range(len(results) - 1):
|
||||
assert results[i]["similarity"] >= results[i + 1]["similarity"]
|
||||
|
||||
|
||||
def test_resonance_respects_limit(archive):
|
||||
"""Should respect the limit parameter."""
|
||||
results_3 = archive.resonance(min_similarity=0.0, limit=3)
|
||||
results_10 = archive.resonance(min_similarity=0.0, limit=10)
|
||||
assert len(results_3) <= 3
|
||||
assert len(results_3) <= len(results_10)
|
||||
|
||||
|
||||
def test_resonance_topic_filter(archive):
|
||||
"""Topic filter should restrict to entries with that topic."""
|
||||
results = archive.resonance(min_similarity=0.0, limit=100, topic="cooking")
|
||||
for r in results:
|
||||
entry_a = archive.get(r["entry_a"])
|
||||
entry_b = archive.get(r["entry_b"])
|
||||
assert "cooking" in entry_a.topics or "cooking" in entry_b.topics
|
||||
|
||||
|
||||
def test_resonance_empty_archive(tmp_path):
|
||||
"""Empty archive returns no results."""
|
||||
path = tmp_path / "empty_archive.json"
|
||||
arch = MnemosyneArchive(archive_path=path)
|
||||
results = arch.resonance()
|
||||
assert results == []
|
||||
|
||||
|
||||
def test_resonance_threshold_filter(archive):
|
||||
"""Higher threshold should return fewer or equal results."""
|
||||
low = archive.resonance(min_similarity=0.1, limit=100)
|
||||
high = archive.resonance(min_similarity=0.5, limit=100)
|
||||
assert len(high) <= len(low)
|
||||
for r in high:
|
||||
assert r["similarity"] >= 0.5
|
||||
@@ -1,139 +0,0 @@
|
||||
"""Tests for Mnemosyne archive snapshot — create, list, restore, diff."""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
from nexus.mnemosyne.ingest import ingest_event
|
||||
from nexus.mnemosyne.snapshot import (
|
||||
snapshot_create,
|
||||
snapshot_list,
|
||||
snapshot_restore,
|
||||
snapshot_diff,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def archive(tmp_path):
|
||||
"""Create a fresh archive with a few entries."""
|
||||
path = tmp_path / "test_archive.json"
|
||||
arch = MnemosyneArchive(archive_path=path, auto_embed=False)
|
||||
ingest_event(arch, title="First", content="hello world", topics=["test"])
|
||||
ingest_event(arch, title="Second", content="another entry", topics=["demo"])
|
||||
ingest_event(arch, title="Third", content="more content here", topics=["test", "demo"])
|
||||
return arch
|
||||
|
||||
|
||||
class TestSnapshotCreate:
|
||||
def test_create_returns_metadata(self, archive):
|
||||
result = snapshot_create(archive, label="test snap")
|
||||
assert "snapshot_id" in result
|
||||
assert result["label"] == "test snap"
|
||||
assert result["entry_count"] == 3
|
||||
assert Path(result["path"]).exists()
|
||||
|
||||
def test_create_no_label(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
assert result["label"] == ""
|
||||
|
||||
def test_snapshot_file_is_valid_json(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
with open(result["path"]) as f:
|
||||
data = json.load(f)
|
||||
assert data["entry_count"] == 3
|
||||
assert len(data["entries"]) == 3
|
||||
assert "created_at" in data
|
||||
|
||||
|
||||
class TestSnapshotList:
|
||||
def test_empty_list(self, archive):
|
||||
# Snapshots dir doesn't exist yet (no snapshots created)
|
||||
# Actually, create() makes the dir, so list before any create:
|
||||
snaps = snapshot_list(archive)
|
||||
assert snaps == []
|
||||
|
||||
def test_list_returns_created_snapshots(self, archive):
|
||||
snapshot_create(archive, label="first")
|
||||
snapshot_create(archive, label="second")
|
||||
snaps = snapshot_list(archive)
|
||||
assert len(snaps) == 2
|
||||
# Newest first
|
||||
assert snaps[0]["label"] == "second"
|
||||
assert snaps[1]["label"] == "first"
|
||||
|
||||
def test_list_entry_count(self, archive):
|
||||
snapshot_create(archive)
|
||||
snaps = snapshot_list(archive)
|
||||
assert snaps[0]["entry_count"] == 3
|
||||
|
||||
|
||||
class TestSnapshotRestore:
|
||||
def test_restore_replaces_entries(self, archive):
|
||||
result = snapshot_create(archive, label="before change")
|
||||
sid = result["snapshot_id"]
|
||||
|
||||
# Add more entries
|
||||
ingest_event(archive, title="Fourth", content="new entry", topics=["new"])
|
||||
assert archive.count == 4
|
||||
|
||||
# Restore
|
||||
restore_result = snapshot_restore(archive, sid)
|
||||
assert restore_result["restored_entries"] == 3
|
||||
assert restore_result["previous_count"] == 4
|
||||
assert archive.count == 3
|
||||
|
||||
def test_restore_prefix_match(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
sid = result["snapshot_id"]
|
||||
# Use just first 4 chars
|
||||
restore_result = snapshot_restore(archive, sid[:4])
|
||||
assert restore_result["snapshot_id"] == sid
|
||||
|
||||
def test_restore_nonexistent_raises(self, archive):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
snapshot_restore(archive, "nonexistent-id")
|
||||
|
||||
def test_restore_preserves_content(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
original_titles = sorted(e.title for e in archive._entries.values())
|
||||
|
||||
ingest_event(archive, title="Extra", content="extra", topics=[])
|
||||
snapshot_restore(archive, result["snapshot_id"])
|
||||
|
||||
restored_titles = sorted(e.title for e in archive._entries.values())
|
||||
assert restored_titles == original_titles
|
||||
|
||||
|
||||
class TestSnapshotDiff:
|
||||
def test_diff_identical(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
diff = snapshot_diff(archive, result["snapshot_id"])
|
||||
assert diff["added"] == 0
|
||||
assert diff["removed"] == 0
|
||||
assert diff["changed"] == 0
|
||||
|
||||
def test_diff_added_entries(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
ingest_event(archive, title="New Entry", content="new", topics=["new"])
|
||||
diff = snapshot_diff(archive, result["snapshot_id"])
|
||||
assert diff["added"] == 1
|
||||
assert diff["removed"] == 0
|
||||
assert diff["current_entries"] == 4
|
||||
assert diff["snapshot_entries"] == 3
|
||||
|
||||
def test_diff_removed_entries(self, archive):
|
||||
result = snapshot_create(archive)
|
||||
# Remove an entry
|
||||
first_id = list(archive._entries.keys())[0]
|
||||
archive.remove(first_id)
|
||||
diff = snapshot_diff(archive, result["snapshot_id"])
|
||||
assert diff["removed"] == 1
|
||||
assert first_id in diff["removed_ids"]
|
||||
|
||||
def test_diff_nonexistent_raises(self, archive):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
snapshot_diff(archive, "nope")
|
||||
240
nexus/mnemosyne/tests/test_snapshots.py
Normal file
240
nexus/mnemosyne/tests/test_snapshots.py
Normal file
@@ -0,0 +1,240 @@
|
||||
"""Tests for Mnemosyne snapshot (point-in-time backup/restore) feature."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.ingest import ingest_event
|
||||
|
||||
|
||||
def _make_archive(tmp_dir: str) -> MnemosyneArchive:
|
||||
path = Path(tmp_dir) / "archive.json"
|
||||
return MnemosyneArchive(archive_path=path, auto_embed=False)
|
||||
|
||||
|
||||
# ─── snapshot_create ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_snapshot_create_returns_metadata():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="Alpha", content="First entry", topics=["a"])
|
||||
ingest_event(archive, title="Beta", content="Second entry", topics=["b"])
|
||||
|
||||
result = archive.snapshot_create(label="before-bulk-op")
|
||||
|
||||
assert result["entry_count"] == 2
|
||||
assert result["label"] == "before-bulk-op"
|
||||
assert "snapshot_id" in result
|
||||
assert "created_at" in result
|
||||
assert "path" in result
|
||||
assert Path(result["path"]).exists()
|
||||
|
||||
|
||||
def test_snapshot_create_no_label():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="Gamma", content="Third entry", topics=[])
|
||||
|
||||
result = archive.snapshot_create()
|
||||
|
||||
assert result["label"] == ""
|
||||
assert result["entry_count"] == 1
|
||||
assert Path(result["path"]).exists()
|
||||
|
||||
|
||||
def test_snapshot_file_contains_entries():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
e = ingest_event(archive, title="Delta", content="Fourth entry", topics=["d"])
|
||||
result = archive.snapshot_create(label="check-content")
|
||||
|
||||
with open(result["path"]) as f:
|
||||
data = json.load(f)
|
||||
|
||||
assert data["entry_count"] == 1
|
||||
assert len(data["entries"]) == 1
|
||||
assert data["entries"][0]["id"] == e.id
|
||||
assert data["entries"][0]["title"] == "Delta"
|
||||
|
||||
|
||||
def test_snapshot_create_empty_archive():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
result = archive.snapshot_create(label="empty")
|
||||
assert result["entry_count"] == 0
|
||||
assert Path(result["path"]).exists()
|
||||
|
||||
|
||||
# ─── snapshot_list ───────────────────────────────────────────────────────────
|
||||
|
||||
def test_snapshot_list_empty():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
assert archive.snapshot_list() == []
|
||||
|
||||
|
||||
def test_snapshot_list_returns_all():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="One", content="c1", topics=[])
|
||||
archive.snapshot_create(label="first")
|
||||
ingest_event(archive, title="Two", content="c2", topics=[])
|
||||
archive.snapshot_create(label="second")
|
||||
|
||||
snapshots = archive.snapshot_list()
|
||||
assert len(snapshots) == 2
|
||||
labels = {s["label"] for s in snapshots}
|
||||
assert "first" in labels
|
||||
assert "second" in labels
|
||||
|
||||
|
||||
def test_snapshot_list_metadata_fields():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
archive.snapshot_create(label="meta-check")
|
||||
snapshots = archive.snapshot_list()
|
||||
s = snapshots[0]
|
||||
for key in ("snapshot_id", "label", "created_at", "entry_count", "path"):
|
||||
assert key in s
|
||||
|
||||
|
||||
def test_snapshot_list_newest_first():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
archive.snapshot_create(label="a")
|
||||
archive.snapshot_create(label="b")
|
||||
snapshots = archive.snapshot_list()
|
||||
# Filenames sort lexicographically; newest (b) should be first
|
||||
# (filenames include timestamp so alphabetical = newest-last;
|
||||
# snapshot_list reverses the glob order → newest first)
|
||||
assert len(snapshots) == 2
|
||||
# Both should be present; ordering is newest first
|
||||
ids = [s["snapshot_id"] for s in snapshots]
|
||||
assert ids == sorted(ids, reverse=True)
|
||||
|
||||
|
||||
# ─── snapshot_restore ────────────────────────────────────────────────────────
|
||||
|
||||
def test_snapshot_restore_replaces_entries():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="Kept", content="original content", topics=["orig"])
|
||||
snap = archive.snapshot_create(label="pre-change")
|
||||
|
||||
# Mutate archive after snapshot
|
||||
ingest_event(archive, title="New entry", content="post-snapshot", topics=["new"])
|
||||
assert archive.count == 2
|
||||
|
||||
result = archive.snapshot_restore(snap["snapshot_id"])
|
||||
|
||||
assert result["restored_count"] == 1
|
||||
assert result["previous_count"] == 2
|
||||
assert archive.count == 1
|
||||
entry = list(archive._entries.values())[0]
|
||||
assert entry.title == "Kept"
|
||||
|
||||
|
||||
def test_snapshot_restore_persists_to_disk():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = Path(tmp) / "archive.json"
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="Persisted", content="should survive reload", topics=[])
|
||||
snap = archive.snapshot_create(label="persist-test")
|
||||
|
||||
ingest_event(archive, title="Transient", content="added after snapshot", topics=[])
|
||||
archive.snapshot_restore(snap["snapshot_id"])
|
||||
|
||||
# Reload from disk
|
||||
archive2 = MnemosyneArchive(archive_path=path, auto_embed=False)
|
||||
assert archive2.count == 1
|
||||
assert list(archive2._entries.values())[0].title == "Persisted"
|
||||
|
||||
|
||||
def test_snapshot_restore_missing_raises():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
with pytest.raises(FileNotFoundError):
|
||||
archive.snapshot_restore("nonexistent_snapshot_id")
|
||||
|
||||
|
||||
# ─── snapshot_diff ───────────────────────────────────────────────────────────
|
||||
|
||||
def test_snapshot_diff_no_changes():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="Stable", content="unchanged content", topics=[])
|
||||
snap = archive.snapshot_create(label="baseline")
|
||||
|
||||
diff = archive.snapshot_diff(snap["snapshot_id"])
|
||||
|
||||
assert diff["added"] == []
|
||||
assert diff["removed"] == []
|
||||
assert diff["modified"] == []
|
||||
assert diff["unchanged"] == 1
|
||||
|
||||
|
||||
def test_snapshot_diff_detects_added():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
ingest_event(archive, title="Original", content="existing", topics=[])
|
||||
snap = archive.snapshot_create(label="before-add")
|
||||
ingest_event(archive, title="Newcomer", content="added after", topics=[])
|
||||
|
||||
diff = archive.snapshot_diff(snap["snapshot_id"])
|
||||
|
||||
assert len(diff["added"]) == 1
|
||||
assert diff["added"][0]["title"] == "Newcomer"
|
||||
assert diff["removed"] == []
|
||||
assert diff["unchanged"] == 1
|
||||
|
||||
|
||||
def test_snapshot_diff_detects_removed():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
e1 = ingest_event(archive, title="Will Be Removed", content="doomed", topics=[])
|
||||
ingest_event(archive, title="Survivor", content="stays", topics=[])
|
||||
snap = archive.snapshot_create(label="pre-removal")
|
||||
archive.remove(e1.id)
|
||||
|
||||
diff = archive.snapshot_diff(snap["snapshot_id"])
|
||||
|
||||
assert len(diff["removed"]) == 1
|
||||
assert diff["removed"][0]["title"] == "Will Be Removed"
|
||||
assert diff["added"] == []
|
||||
assert diff["unchanged"] == 1
|
||||
|
||||
|
||||
def test_snapshot_diff_detects_modified():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
e = ingest_event(archive, title="Mutable", content="original content", topics=[])
|
||||
snap = archive.snapshot_create(label="pre-edit")
|
||||
archive.update_entry(e.id, content="updated content", auto_link=False)
|
||||
|
||||
diff = archive.snapshot_diff(snap["snapshot_id"])
|
||||
|
||||
assert len(diff["modified"]) == 1
|
||||
assert diff["modified"][0]["title"] == "Mutable"
|
||||
assert diff["modified"][0]["snapshot_hash"] != diff["modified"][0]["current_hash"]
|
||||
assert diff["added"] == []
|
||||
assert diff["removed"] == []
|
||||
|
||||
|
||||
def test_snapshot_diff_missing_raises():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
with pytest.raises(FileNotFoundError):
|
||||
archive.snapshot_diff("no_such_snapshot")
|
||||
|
||||
|
||||
def test_snapshot_diff_includes_snapshot_id():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _make_archive(tmp)
|
||||
snap = archive.snapshot_create(label="id-check")
|
||||
diff = archive.snapshot_diff(snap["snapshot_id"])
|
||||
assert diff["snapshot_id"] == snap["snapshot_id"]
|
||||
Reference in New Issue
Block a user