32 lines
1.4 KiB
Python
32 lines
1.4 KiB
Python
"""Archive snapshot — point-in-time backup and restore."""
|
|
import json, uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
def snapshot_create(archive, label=None):
|
|
sid = str(uuid.uuid4())[:8]
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
data = {"snapshot_id": sid, "label": label or "", "created_at": now, "entries": [e.to_dict() for e in archive._entries.values()]}
|
|
path = archive.path.parent / "snapshots" / f"{sid}.json"
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w") as f: json.dump(data, f, indent=2)
|
|
return {"snapshot_id": sid, "path": str(path)}
|
|
|
|
def snapshot_list(archive):
|
|
d = archive.path.parent / "snapshots"
|
|
if not d.exists(): return []
|
|
snaps = []
|
|
for f in d.glob("*.json"):
|
|
with open(f) as fh: meta = json.load(fh)
|
|
snaps.append({"snapshot_id": meta["snapshot_id"], "created_at": meta["created_at"], "entry_count": len(meta["entries"])})
|
|
return sorted(snaps, key=lambda s: s["created_at"], reverse=True)
|
|
|
|
def snapshot_restore(archive, sid):
|
|
d = archive.path.parent / "snapshots"
|
|
f = next((x for x in d.glob("*.json") if x.stem.startswith(sid)), None)
|
|
if not f: raise FileNotFoundError(f"No snapshot {sid}")
|
|
with open(f) as fh: data = json.load(fh)
|
|
archive._entries = {e["id"]: ArchiveEntry.from_dict(e) for e in data["entries"]}
|
|
archive._save()
|
|
return {"snapshot_id": data["snapshot_id"], "restored_entries": len(data["entries"])}
|