feat: add archive snapshot module (#1268)

Point-in-time backup and restore for Mnemosyne.
snapshot_create, snapshot_list, snapshot_restore, snapshot_diff.
This commit is contained in:
2026-04-12 09:40:54 +00:00
parent bb21beccdd
commit bbdf4fbbff

206
nexus/mnemosyne/snapshot.py Normal file
View File

@@ -0,0 +1,206 @@
"""Archive snapshot — point-in-time backup and restore.
Lets users create timestamped snapshots of the archive, list them,
restore from any snapshot, and diff a snapshot against the current state.
Snapshots are stored as JSON files in a ``snapshots/`` subdirectory next
to the archive file.
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def _snapshots_dir(archive: MnemosyneArchive) -> Path:
"""Return the snapshots directory, creating it if needed."""
d = archive.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
def snapshot_create(
archive: MnemosyneArchive,
label: Optional[str] = None,
) -> dict:
"""Create a point-in-time snapshot of the archive.
Args:
archive: The archive to snapshot.
label: Optional human-readable label for the snapshot.
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
snapshot_id = str(uuid.uuid4())[:8]
now = datetime.now(timezone.utc).isoformat()
data = {
"snapshot_id": snapshot_id,
"label": label or "",
"created_at": now,
"entry_count": archive.count,
"entries": [e.to_dict() for e in archive._entries.values()],
}
path = _snapshots_dir(archive) / f"{snapshot_id}.json"
with open(path, "w") as f:
json.dump(data, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label or "",
"created_at": now,
"entry_count": archive.count,
"path": str(path),
}
def snapshot_list(archive: MnemosyneArchive) -> list[dict]:
"""List all available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count
"""
snapshots = []
d = _snapshots_dir(archive)
for f in sorted(d.glob("*.json")):
try:
with open(f) as fh:
meta = json.load(fh)
snapshots.append({
"snapshot_id": meta["snapshot_id"],
"label": meta.get("label", ""),
"created_at": meta["created_at"],
"entry_count": meta["entry_count"],
})
except (json.JSONDecodeError, KeyError):
continue
# Newest first
snapshots.sort(key=lambda s: s["created_at"], reverse=True)
return snapshots
def snapshot_restore(
archive: MnemosyneArchive,
snapshot_id: str,
) -> dict:
"""Restore the archive from a snapshot.
Replaces ALL current entries with the snapshot data. The archive is
saved immediately after restore.
Args:
archive: The archive to restore into.
snapshot_id: ID of the snapshot to restore (or unique prefix).
Returns:
Dict with keys: snapshot_id, restored_entries, previous_count
Raises:
FileNotFoundError: If no matching snapshot is found.
"""
d = _snapshots_dir(archive)
# Find snapshot file by prefix match
snapshot_path = None
for f in d.glob("*.json"):
if f.stem.startswith(snapshot_id):
snapshot_path = f
break
if snapshot_path is None:
raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found")
with open(snapshot_path) as fh:
data = json.load(fh)
previous_count = archive.count
# Clear and restore
archive._entries = {}
for entry_data in data["entries"]:
entry = ArchiveEntry.from_dict(entry_data)
archive._entries[entry.id] = entry
archive._save()
return {
"snapshot_id": data["snapshot_id"],
"label": data.get("label", ""),
"restored_entries": len(data["entries"]),
"previous_count": previous_count,
}
def snapshot_diff(
archive: MnemosyneArchive,
snapshot_id: str,
) -> dict:
"""Compare a snapshot against the current archive state.
Args:
archive: The current archive.
snapshot_id: ID of the snapshot to compare (or unique prefix).
Returns:
Dict with keys: snapshot_id, snapshot_entries, current_entries,
added (in current but not snapshot), removed (in snapshot but not current),
changed (same ID but different content_hash)
Raises:
FileNotFoundError: If no matching snapshot is found.
"""
d = _snapshots_dir(archive)
snapshot_path = None
for f in d.glob("*.json"):
if f.stem.startswith(snapshot_id):
snapshot_path = f
break
if snapshot_path is None:
raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found")
with open(snapshot_path) as fh:
data = json.load(fh)
snap_entries = {e["id"]: e for e in data["entries"]}
curr_entries = {e.id: e.to_dict() for e in archive._entries.values()}
snap_ids = set(snap_entries.keys())
curr_ids = set(curr_entries.keys())
added_ids = curr_ids - snap_ids
removed_ids = snap_ids - curr_ids
common_ids = snap_ids & curr_ids
changed = []
for eid in common_ids:
snap_hash = snap_entries[eid].get("content_hash", "")
curr_hash = curr_entries[eid].get("content_hash", "")
if snap_hash != curr_hash:
changed.append({
"id": eid,
"title": curr_entries[eid].get("title", ""),
"snapshot_hash": snap_hash,
"current_hash": curr_hash,
})
return {
"snapshot_id": data["snapshot_id"],
"label": data.get("label", ""),
"snapshot_entries": len(snap_entries),
"current_entries": len(curr_entries),
"added": len(added_ids),
"removed": len(removed_ids),
"changed": len(changed),
"added_ids": sorted(added_ids),
"removed_ids": sorted(removed_ids),
"changed_details": changed,
}