Compare commits

...

3 Commits

Author SHA1 Message Date
e04f53b97d test(mnemosyne): add tag management tests
Some checks failed
CI / test (pull_request) Failing after 10s
CI / validate (pull_request) Failing after 15s
Review Approval Gate / verify-review (pull_request) Failing after 3s
Covers add_tags, remove_tags, retag with:
- deduplication
- case-insensitive matching
- missing entry errors
- empty tag lists
- persistence on reload

Part of #1236.
2026-04-11 23:26:01 +00:00
196e1e07fe feat(mnemosyne): add tag, untag, retag CLI commands
Part of #1236.
2026-04-11 23:25:19 +00:00
f87b4e4d7a feat(mnemosyne): add tag management — add_tags, remove_tags, retag
Closes #1236.

Three new methods on MnemosyneArchive:
- add_tags: add new tags (dedup, case-insensitive)
- remove_tags: remove specific tags
- retag: replace all tags at once

All methods return the entry's final topic list.
2026-04-11 23:24:59 +00:00
3 changed files with 253 additions and 1 deletions

View File

@@ -212,6 +212,79 @@ class MnemosyneArchive:
def count(self) -> int:
return len(self._entries)
def add_tags(self, entry_id: str, tags: list[str]) -> list[str]:
"""Add tags to an existing entry. Returns the entry's full topic list after addition.
Args:
entry_id: The entry ID to modify.
tags: List of tags to add (duplicates are ignored).
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(f"Entry not found: {entry_id}")
existing_lower = {t.lower() for t in entry.topics}
for tag in tags:
tag = tag.strip()
if tag and tag.lower() not in existing_lower:
entry.topics.append(tag)
existing_lower.add(tag.lower())
self._save()
return entry.topics
def remove_tags(self, entry_id: str, tags: list[str]) -> list[str]:
"""Remove tags from an existing entry. Returns the entry's topic list after removal.
Args:
entry_id: The entry ID to modify.
tags: List of tags to remove (case-insensitive).
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(f"Entry not found: {entry_id}")
remove_lower = {t.strip().lower() for t in tags}
entry.topics = [t for t in entry.topics if t.lower() not in remove_lower]
self._save()
return entry.topics
def retag(self, entry_id: str, tags: list[str]) -> list[str]:
"""Replace all tags on an existing entry. Returns the new topic list.
Args:
entry_id: The entry ID to modify.
tags: The new complete tag list (replaces all existing tags).
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(f"Entry not found: {entry_id}")
# Deduplicate, preserve order
seen: set[str] = set()
deduped: list[str] = []
for tag in tags:
tag = tag.strip()
if tag and tag.lower() not in seen:
deduped.append(tag)
seen.add(tag.lower())
entry.topics = deduped
self._save()
return entry.topics
def graph_data(
self,
topic_filter: Optional[str] = None,

View File

@@ -2,7 +2,8 @@
Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag
"""
from __future__ import annotations
@@ -143,6 +144,41 @@ def cmd_rebuild(args):
print(f"Rebuilt links: {total} connections across {archive.count} entries")
def cmd_tag(args):
archive = MnemosyneArchive()
try:
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
topics = archive.add_tags(args.entry_id, tags)
print(f"Tags added. Topics: {', '.join(topics)}")
except KeyError as e:
print(str(e))
sys.exit(1)
def cmd_untag(args):
archive = MnemosyneArchive()
try:
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
topics = archive.remove_tags(args.entry_id, tags)
print(f"Tags removed. Topics: {', '.join(topics) if topics else '(none)'}")
except KeyError as e:
print(str(e))
sys.exit(1)
def cmd_retag(args):
archive = MnemosyneArchive()
try:
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
topics = archive.retag(args.entry_id, tags)
print(f"Retagged. Topics: {', '.join(topics) if topics else '(none)'}")
except KeyError as e:
print(str(e))
sys.exit(1)
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
@@ -184,6 +220,18 @@ def main():
rb = sub.add_parser("rebuild", help="Recompute all links from scratch")
rb.add_argument("-t", "--threshold", type=float, default=None, help="Similarity threshold override")
tg = sub.add_parser("tag", help="Add tags to an entry")
tg.add_argument("entry_id", help="Entry ID (or prefix)")
tg.add_argument("tags", help="Comma-separated tags to add")
ut = sub.add_parser("untag", help="Remove tags from an entry")
ut.add_argument("entry_id", help="Entry ID (or prefix)")
ut.add_argument("tags", help="Comma-separated tags to remove")
rt = sub.add_parser("retag", help="Replace all tags on an entry")
rt.add_argument("entry_id", help="Entry ID (or prefix)")
rt.add_argument("tags", help="Comma-separated new tags")
args = parser.parse_args()
if not args.command:
parser.print_help()
@@ -201,6 +249,9 @@ def main():
"hubs": cmd_hubs,
"bridges": cmd_bridges,
"rebuild": cmd_rebuild,
"tag": cmd_tag,
"untag": cmd_untag,
"retag": cmd_retag,
}
dispatch[args.command](args)

View File

@@ -343,3 +343,131 @@ def test_archive_topic_counts():
assert counts["automation"] == 2
# sorted by count desc — both tied but must be present
assert set(counts.keys()) == {"python", "automation"}
# ─── Tag Management Tests ───────────────────────────────
def test_add_tags():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["existing"])
topics = archive.add_tags(entry.id, ["new_tag", "another"])
assert "existing" in topics
assert "new_tag" in topics
assert "another" in topics
assert len(topics) == 3
def test_add_tags_dedup():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["existing"])
topics = archive.add_tags(entry.id, ["existing", "Existing"])
assert topics.count("existing") == 1 # original preserved
assert len(topics) == 1 # no duplicate added
def test_add_tags_missing_entry():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
try:
archive.add_tags("nonexistent", ["tag"])
assert False, "Should have raised KeyError"
except KeyError:
pass
def test_remove_tags():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["alpha", "beta", "gamma"])
topics = archive.remove_tags(entry.id, ["beta"])
assert "alpha" in topics
assert "beta" not in topics
assert "gamma" in topics
assert len(topics) == 2
def test_remove_tags_case_insensitive():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["Alpha", "Beta"])
topics = archive.remove_tags(entry.id, ["alpha"])
assert "Beta" in topics
assert len(topics) == 1
def test_remove_tags_missing_entry():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
try:
archive.remove_tags("nonexistent", ["tag"])
assert False, "Should have raised KeyError"
except KeyError:
pass
def test_retag():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["old1", "old2"])
topics = archive.retag(entry.id, ["new1", "new2", "new3"])
assert topics == ["new1", "new2", "new3"]
assert "old1" not in topics
def test_retag_dedup():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["old"])
topics = archive.retag(entry.id, ["tag", "TAG", "Tag"])
assert topics == ["tag"] # deduped, first one kept
def test_retag_empty():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["old"])
topics = archive.retag(entry.id, [])
assert topics == []
def test_retag_missing_entry():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
try:
archive.retag("nonexistent", ["tag"])
assert False, "Should have raised KeyError"
except KeyError:
pass
def test_tags_persist_on_reload():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path)
entry = ingest_event(archive, title="Test", content="hello", topics=["original"])
archive.add_tags(entry.id, ["added"])
# Reload from disk
archive2 = MnemosyneArchive(archive_path=path)
reloaded = archive2.get(entry.id)
assert "original" in reloaded.topics
assert "added" in reloaded.topics