Files
the-nexus/nexus/mnemosyne/cli.py
Alexander Whitestone 1437613560 feat(mnemosyne): add CLI commands and tests for temporal queries
Closes #1244

- CLI: 'timeline <start> <end>' for date range queries
- CLI: 'neighbors <entry_id> [--days N]' for temporal proximity
- Tests: 16 test cases covering parsing, ranges, boundaries,
  sorting, limits, neighbor ordering, and edge cases
2026-04-11 20:48:14 -04:00

311 lines
11 KiB
Python

"""CLI interface for Mnemosyne.
Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors
"""
from __future__ import annotations
import argparse
import json
import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
def cmd_stats(args):
archive = MnemosyneArchive()
stats = archive.stats()
print(json.dumps(stats, indent=2))
def cmd_search(args):
archive = MnemosyneArchive()
if getattr(args, "semantic", False):
results = archive.semantic_search(args.query, limit=args.limit)
else:
results = archive.search(args.query, limit=args.limit)
if not results:
print("No results found.")
return
for entry in results:
linked = len(entry.links)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Source: {entry.source} | Topics: {', '.join(entry.topics)} | Links: {linked}")
print(f" {entry.content[:120]}...")
print()
def cmd_ingest(args):
archive = MnemosyneArchive()
entry = ingest_event(
archive,
title=args.title,
content=args.content,
topics=args.topics.split(",") if args.topics else [],
)
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_link(args):
archive = MnemosyneArchive()
entry = archive.get(args.entry_id)
if not entry:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
linked = archive.get_linked(entry.id, depth=args.depth)
if not linked:
print("No linked entries found.")
return
for e in linked:
print(f" [{e.id[:8]}] {e.title} (source: {e.source})")
def cmd_topics(args):
archive = MnemosyneArchive()
counts = archive.topic_counts()
if not counts:
print("No topics found.")
return
for topic, count in counts.items():
print(f" {topic}: {count}")
def cmd_remove(args):
archive = MnemosyneArchive()
removed = archive.remove(args.entry_id)
if removed:
print(f"Removed entry: {args.entry_id}")
else:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
def cmd_export(args):
archive = MnemosyneArchive()
topics = [t.strip() for t in args.topics.split(",")] if args.topics else None
data = archive.export(query=args.query or None, topics=topics)
print(json.dumps(data, indent=2))
def cmd_clusters(args):
archive = MnemosyneArchive()
clusters = archive.graph_clusters(min_size=args.min_size)
if not clusters:
print("No clusters found.")
return
for c in clusters:
print(f"Cluster {c['cluster_id']}: {c['size']} entries, density={c['density']}")
print(f" Topics: {', '.join(c['top_topics']) if c['top_topics'] else '(none)'}")
if args.verbose:
for eid in c["entries"]:
entry = archive.get(eid)
if entry:
print(f" [{eid[:8]}] {entry.title}")
print()
def cmd_hubs(args):
archive = MnemosyneArchive()
hubs = archive.hub_entries(limit=args.limit)
if not hubs:
print("No hubs found.")
return
for h in hubs:
e = h["entry"]
print(f"[{e.id[:8]}] {e.title}")
print(f" Degree: {h['degree']} (in: {h['inbound']}, out: {h['outbound']})")
print(f" Topics: {', '.join(h['topics']) if h['topics'] else '(none)'}")
print()
def cmd_bridges(args):
archive = MnemosyneArchive()
bridges = archive.bridge_entries()
if not bridges:
print("No bridge entries found.")
return
for b in bridges:
e = b["entry"]
print(f"[{e.id[:8]}] {e.title}")
print(f" Bridges {b['components_after_removal']} components (cluster: {b['cluster_size']} entries)")
print(f" Topics: {', '.join(b['topics']) if b['topics'] else '(none)'}")
print()
def cmd_rebuild(args):
archive = MnemosyneArchive()
threshold = args.threshold if args.threshold else None
total = archive.rebuild_links(threshold=threshold)
print(f"Rebuilt links: {total} connections across {archive.count} entries")
def cmd_tag(args):
archive = MnemosyneArchive()
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
try:
entry = archive.add_tags(args.entry_id, tags)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Topics: {', '.join(entry.topics) if entry.topics else '(none)'}")
def cmd_untag(args):
archive = MnemosyneArchive()
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
try:
entry = archive.remove_tags(args.entry_id, tags)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Topics: {', '.join(entry.topics) if entry.topics else '(none)'}")
def cmd_retag(args):
archive = MnemosyneArchive()
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
try:
entry = archive.retag(args.entry_id, tags)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Topics: {', '.join(entry.topics) if entry.topics else '(none)'}")
def cmd_timeline(args):
archive = MnemosyneArchive()
results = archive.by_date_range(args.start, args.end, limit=args.limit)
if not results:
print(f"No entries between {args.start} and {args.end}.")
return
print(f"Timeline: {args.start}{args.end} ({len(results)} entries)")
print()
for entry in results:
print(f" [{entry.created_at[:10]}] {entry.title}")
print(f" ID: {entry.id[:8]} | Source: {entry.source} | Topics: {', '.join(entry.topics)}")
print()
def cmd_neighbors(args):
archive = MnemosyneArchive()
# Resolve prefix to full ID
matches = [e for e in archive._entries.values() if e.id.startswith(args.entry_id)]
if not matches:
print(f"No entry matching '{args.entry_id}'.")
return
if len(matches) > 1:
print(f"Ambiguous — {len(matches)} entries match '{args.entry_id}'. Use a longer prefix.")
return
entry = matches[0]
results = archive.temporal_neighbors(entry.id, window_days=args.days)
if not results:
print(f"No entries within {args.days} days of [{entry.id[:8]}] {entry.title}.")
return
print(f"Neighbors of [{entry.id[:8]}] {entry.title}{args.days} days):")
print()
for neighbor in results:
print(f" [{neighbor.created_at[:10]}] {neighbor.title}")
print(f" ID: {neighbor.id[:8]} | Source: {neighbor.source} | Topics: {', '.join(neighbor.topics)}")
print()
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="Mnemosyne — Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
sub.add_parser("stats", help="Show archive statistics")
s = sub.add_parser("search", help="Search the archive")
s.add_argument("query", help="Search query")
s.add_argument("-n", "--limit", type=int, default=10)
s.add_argument("--semantic", action="store_true", help="Use holographic linker similarity scoring")
i = sub.add_parser("ingest", help="Ingest a new entry")
i.add_argument("--title", required=True)
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
l.add_argument("-d", "--depth", type=int, default=1)
sub.add_parser("topics", help="List all topics with entry counts")
r = sub.add_parser("remove", help="Remove an entry by ID")
r.add_argument("entry_id", help="Entry ID to remove")
ex = sub.add_parser("export", help="Export filtered archive data as JSON")
ex.add_argument("-q", "--query", default="", help="Keyword filter")
ex.add_argument("-t", "--topics", default="", help="Comma-separated topic filter")
cl = sub.add_parser("clusters", help="Show graph clusters (connected components)")
cl.add_argument("-m", "--min-size", type=int, default=1, help="Minimum cluster size")
cl.add_argument("-v", "--verbose", action="store_true", help="List entries in each cluster")
hu = sub.add_parser("hubs", help="Show most connected entries (hub analysis)")
hu.add_argument("-n", "--limit", type=int, default=10, help="Max hubs to show")
sub.add_parser("bridges", help="Show bridge entries (articulation points)")
rb = sub.add_parser("rebuild", help="Recompute all links from scratch")
rb.add_argument("-t", "--threshold", type=float, default=None, help="Similarity threshold override")
tg = sub.add_parser("tag", help="Add tags to an existing entry")
tg.add_argument("entry_id", help="Entry ID")
tg.add_argument("tags", help="Comma-separated tags to add")
ut = sub.add_parser("untag", help="Remove tags from an existing entry")
ut.add_argument("entry_id", help="Entry ID")
ut.add_argument("tags", help="Comma-separated tags to remove")
rt = sub.add_parser("retag", help="Replace all tags on an existing entry")
rt.add_argument("entry_id", help="Entry ID")
rt.add_argument("tags", help="Comma-separated new tag list")
tl = sub.add_parser("timeline", help="Show entries within a date range")
tl.add_argument("start", help="Start date (YYYY-MM-DD or ISO datetime)")
tl.add_argument("end", help="End date (YYYY-MM-DD or ISO datetime)")
tl.add_argument("-n", "--limit", type=int, default=50, help="Max entries to show")
nb = sub.add_parser("neighbors", help="Show entries temporally near a reference entry")
nb.add_argument("entry_id", help="Reference entry ID (or prefix)")
nb.add_argument("-d", "--days", type=int, default=7, help="Window in days (default 7)")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
dispatch = {
"stats": cmd_stats,
"search": cmd_search,
"ingest": cmd_ingest,
"link": cmd_link,
"topics": cmd_topics,
"remove": cmd_remove,
"export": cmd_export,
"clusters": cmd_clusters,
"hubs": cmd_hubs,
"bridges": cmd_bridges,
"rebuild": cmd_rebuild,
"tag": cmd_tag,
"untag": cmd_untag,
"retag": cmd_retag,
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
}
dispatch[args.command](args)
if __name__ == "__main__":
main()