feat: add snapshot CLI commands (#1268)
mnemosyne snapshot create|list|restore|diff
This commit is contained in:
@@ -6,7 +6,8 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
|
||||
mnemosyne tag, mnemosyne untag, mnemosyne retag,
|
||||
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
|
||||
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
|
||||
mnemosyne fading, mnemosyne vibrant
|
||||
mnemosyne fading, mnemosyne vibrant,
|
||||
mnemosyne snapshot create|list|restore|diff
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -18,6 +19,7 @@ import sys
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
from nexus.mnemosyne.ingest import ingest_event
|
||||
from nexus.mnemosyne.snapshot import snapshot_create, snapshot_list, snapshot_restore, snapshot_diff
|
||||
|
||||
|
||||
def cmd_stats(args):
|
||||
@@ -315,6 +317,58 @@ def cmd_vibrant(args):
|
||||
print()
|
||||
|
||||
|
||||
|
||||
def cmd_snapshot_create(args):
|
||||
archive = MnemosyneArchive()
|
||||
result = snapshot_create(archive, label=args.label)
|
||||
print(f"Snapshot created: {result['snapshot_id']}")
|
||||
print(f" Entries: {result['entry_count']}")
|
||||
print(f" Label: {result['label'] or '(none)'}")
|
||||
print(f" Path: {result['path']}")
|
||||
|
||||
|
||||
def cmd_snapshot_list(args):
|
||||
archive = MnemosyneArchive()
|
||||
snaps = snapshot_list(archive)
|
||||
if not snaps:
|
||||
print("No snapshots found.")
|
||||
return
|
||||
for s in snaps:
|
||||
label = f" ({s['label']})" if s['label'] else ""
|
||||
print(f" {s['snapshot_id']} {s['created_at'][:19]} {s['entry_count']} entries{label}")
|
||||
|
||||
|
||||
def cmd_snapshot_restore(args):
|
||||
archive = MnemosyneArchive()
|
||||
try:
|
||||
result = snapshot_restore(archive, args.snapshot_id)
|
||||
except FileNotFoundError as e:
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
print(f"Restored snapshot {result['snapshot_id']}")
|
||||
print(f" Entries restored: {result['restored_entries']}")
|
||||
print(f" Previous count: {result['previous_count']}")
|
||||
|
||||
|
||||
def cmd_snapshot_diff(args):
|
||||
archive = MnemosyneArchive()
|
||||
try:
|
||||
result = snapshot_diff(archive, args.snapshot_id)
|
||||
except FileNotFoundError as e:
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
print(f"Diff: snapshot {result['snapshot_id']} vs current")
|
||||
print(f" Snapshot: {result['snapshot_entries']} entries")
|
||||
print(f" Current: {result['current_entries']} entries")
|
||||
print(f" Added: {result['added']}")
|
||||
print(f" Removed: {result['removed']}")
|
||||
print(f" Changed: {result['changed']}")
|
||||
if result['changed_details']:
|
||||
print()
|
||||
for c in result['changed_details']:
|
||||
print(f" [{c['id'][:8]}] {c['title']}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
@@ -406,6 +460,21 @@ def main():
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
# Snapshot subcommands
|
||||
sp = sub.add_parser("snapshot", help="Archive snapshot operations")
|
||||
sp_sub = sp.add_subparsers(dest="snapshot_command")
|
||||
|
||||
sp_create = sp_sub.add_parser("create", help="Create a point-in-time snapshot")
|
||||
sp_create.add_argument("-l", "--label", default="", help="Human-readable label")
|
||||
|
||||
sp_sub.add_parser("list", help="List available snapshots")
|
||||
|
||||
sp_restore = sp_sub.add_parser("restore", help="Restore from a snapshot")
|
||||
sp_restore.add_argument("snapshot_id", help="Snapshot ID (or prefix)")
|
||||
|
||||
sp_diff = sp_sub.add_parser("diff", help="Diff snapshot vs current archive")
|
||||
sp_diff.add_argument("snapshot_id", help="Snapshot ID (or prefix)")
|
||||
|
||||
dispatch = {
|
||||
"stats": cmd_stats,
|
||||
"search": cmd_search,
|
||||
@@ -430,9 +499,26 @@ def main():
|
||||
"vitality": cmd_vitality,
|
||||
"fading": cmd_fading,
|
||||
"vibrant": cmd_vibrant,
|
||||
"snapshot": lambda args: _dispatch_snapshot(args),
|
||||
}
|
||||
dispatch[args.command](args)
|
||||
|
||||
|
||||
def _dispatch_snapshot(args):
|
||||
"""Route snapshot subcommands to handlers."""
|
||||
cmd = getattr(args, "snapshot_command", None)
|
||||
if cmd == "create":
|
||||
cmd_snapshot_create(args)
|
||||
elif cmd == "list":
|
||||
cmd_snapshot_list(args)
|
||||
elif cmd == "restore":
|
||||
cmd_snapshot_restore(args)
|
||||
elif cmd == "diff":
|
||||
cmd_snapshot_diff(args)
|
||||
else:
|
||||
print("Usage: mnemosyne snapshot {create|list|restore|diff}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user