From bbdf4fbbff9328875877b045fc70caa2b1aa1edd Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Sun, 12 Apr 2026 09:40:54 +0000 Subject: [PATCH] feat: add archive snapshot module (#1268) Point-in-time backup and restore for Mnemosyne. snapshot_create, snapshot_list, snapshot_restore, snapshot_diff. --- nexus/mnemosyne/snapshot.py | 206 ++++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 nexus/mnemosyne/snapshot.py diff --git a/nexus/mnemosyne/snapshot.py b/nexus/mnemosyne/snapshot.py new file mode 100644 index 0000000..3eecb30 --- /dev/null +++ b/nexus/mnemosyne/snapshot.py @@ -0,0 +1,206 @@ +"""Archive snapshot — point-in-time backup and restore. + +Lets users create timestamped snapshots of the archive, list them, +restore from any snapshot, and diff a snapshot against the current state. +Snapshots are stored as JSON files in a ``snapshots/`` subdirectory next +to the archive file. +""" + +from __future__ import annotations + +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from nexus.mnemosyne.archive import MnemosyneArchive +from nexus.mnemosyne.entry import ArchiveEntry + + +def _snapshots_dir(archive: MnemosyneArchive) -> Path: + """Return the snapshots directory, creating it if needed.""" + d = archive.path.parent / "snapshots" + d.mkdir(parents=True, exist_ok=True) + return d + + +def snapshot_create( + archive: MnemosyneArchive, + label: Optional[str] = None, +) -> dict: + """Create a point-in-time snapshot of the archive. + + Args: + archive: The archive to snapshot. + label: Optional human-readable label for the snapshot. + + Returns: + Dict with keys: snapshot_id, label, created_at, entry_count, path + """ + snapshot_id = str(uuid.uuid4())[:8] + now = datetime.now(timezone.utc).isoformat() + + data = { + "snapshot_id": snapshot_id, + "label": label or "", + "created_at": now, + "entry_count": archive.count, + "entries": [e.to_dict() for e in archive._entries.values()], + } + + path = _snapshots_dir(archive) / f"{snapshot_id}.json" + with open(path, "w") as f: + json.dump(data, f, indent=2) + + return { + "snapshot_id": snapshot_id, + "label": label or "", + "created_at": now, + "entry_count": archive.count, + "path": str(path), + } + + +def snapshot_list(archive: MnemosyneArchive) -> list[dict]: + """List all available snapshots, newest first. + + Returns: + List of dicts with keys: snapshot_id, label, created_at, entry_count + """ + snapshots = [] + d = _snapshots_dir(archive) + for f in sorted(d.glob("*.json")): + try: + with open(f) as fh: + meta = json.load(fh) + snapshots.append({ + "snapshot_id": meta["snapshot_id"], + "label": meta.get("label", ""), + "created_at": meta["created_at"], + "entry_count": meta["entry_count"], + }) + except (json.JSONDecodeError, KeyError): + continue + # Newest first + snapshots.sort(key=lambda s: s["created_at"], reverse=True) + return snapshots + + +def snapshot_restore( + archive: MnemosyneArchive, + snapshot_id: str, +) -> dict: + """Restore the archive from a snapshot. + + Replaces ALL current entries with the snapshot data. The archive is + saved immediately after restore. + + Args: + archive: The archive to restore into. + snapshot_id: ID of the snapshot to restore (or unique prefix). + + Returns: + Dict with keys: snapshot_id, restored_entries, previous_count + + Raises: + FileNotFoundError: If no matching snapshot is found. + """ + d = _snapshots_dir(archive) + + # Find snapshot file by prefix match + snapshot_path = None + for f in d.glob("*.json"): + if f.stem.startswith(snapshot_id): + snapshot_path = f + break + + if snapshot_path is None: + raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found") + + with open(snapshot_path) as fh: + data = json.load(fh) + + previous_count = archive.count + + # Clear and restore + archive._entries = {} + for entry_data in data["entries"]: + entry = ArchiveEntry.from_dict(entry_data) + archive._entries[entry.id] = entry + archive._save() + + return { + "snapshot_id": data["snapshot_id"], + "label": data.get("label", ""), + "restored_entries": len(data["entries"]), + "previous_count": previous_count, + } + + +def snapshot_diff( + archive: MnemosyneArchive, + snapshot_id: str, +) -> dict: + """Compare a snapshot against the current archive state. + + Args: + archive: The current archive. + snapshot_id: ID of the snapshot to compare (or unique prefix). + + Returns: + Dict with keys: snapshot_id, snapshot_entries, current_entries, + added (in current but not snapshot), removed (in snapshot but not current), + changed (same ID but different content_hash) + + Raises: + FileNotFoundError: If no matching snapshot is found. + """ + d = _snapshots_dir(archive) + + snapshot_path = None + for f in d.glob("*.json"): + if f.stem.startswith(snapshot_id): + snapshot_path = f + break + + if snapshot_path is None: + raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found") + + with open(snapshot_path) as fh: + data = json.load(fh) + + snap_entries = {e["id"]: e for e in data["entries"]} + curr_entries = {e.id: e.to_dict() for e in archive._entries.values()} + + snap_ids = set(snap_entries.keys()) + curr_ids = set(curr_entries.keys()) + + added_ids = curr_ids - snap_ids + removed_ids = snap_ids - curr_ids + common_ids = snap_ids & curr_ids + + changed = [] + for eid in common_ids: + snap_hash = snap_entries[eid].get("content_hash", "") + curr_hash = curr_entries[eid].get("content_hash", "") + if snap_hash != curr_hash: + changed.append({ + "id": eid, + "title": curr_entries[eid].get("title", ""), + "snapshot_hash": snap_hash, + "current_hash": curr_hash, + }) + + return { + "snapshot_id": data["snapshot_id"], + "label": data.get("label", ""), + "snapshot_entries": len(snap_entries), + "current_entries": len(curr_entries), + "added": len(added_ids), + "removed": len(removed_ids), + "changed": len(changed), + "added_ids": sorted(added_ids), + "removed_ids": sorted(removed_ids), + "changed_details": changed, + }