diff --git a/nexus/mnemosyne/FEATURES.yaml b/nexus/mnemosyne/FEATURES.yaml index 5c3ae14c..e009669d 100644 --- a/nexus/mnemosyne/FEATURES.yaml +++ b/nexus/mnemosyne/FEATURES.yaml @@ -67,7 +67,7 @@ modules: cli: status: shipped files: [cli.py] - description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors + description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate tests: status: shipped @@ -182,9 +182,12 @@ planned: - "#TBD" # Will be filled when PR is created memory_consolidation: - status: planned + status: shipped + files: [archive.py, cli.py, tests/test_consolidation.py] description: > Automatic merging of duplicate/near-duplicate memories using content_hash and semantic similarity. Periodic consolidation pass. priority: low + merged_prs: + - "#1260" diff --git a/nexus/mnemosyne/archive.py b/nexus/mnemosyne/archive.py index c2526bcd..16b145c2 100644 --- a/nexus/mnemosyne/archive.py +++ b/nexus/mnemosyne/archive.py @@ -938,6 +938,127 @@ class MnemosyneArchive: "vibrant_count": vibrant_count, } + def consolidate( + self, + threshold: float = 0.9, + dry_run: bool = False, + ) -> list[dict]: + """Scan the archive and merge duplicate/near-duplicate entries. + + Two entries are considered duplicates if: + - They share the same ``content_hash`` (exact duplicate), or + - Their similarity score (via HolographicLinker) exceeds ``threshold`` + (near-duplicate when an embedding backend is available or Jaccard is + high enough at the given threshold). + + Merge strategy: + - Keep the *older* entry (earlier ``created_at``). + - Union topics from both entries (case-deduped). + - Merge metadata from newer into older (older values win on conflicts). + - Transfer all links from the newer entry to the older entry. + - Delete the newer entry. + + Args: + threshold: Similarity threshold for near-duplicate detection (0.0–1.0). + Default 0.9 is intentionally conservative. + dry_run: If True, return the list of would-be merges without mutating + the archive. + + Returns: + List of dicts, one per merged pair:: + + { + "kept": , + "removed": , + "reason": "exact_hash" | "semantic_similarity", + "score": float, # 1.0 for exact hash matches + "dry_run": bool, + } + """ + merges: list[dict] = [] + entries = list(self._entries.values()) + removed_ids: set[str] = set() + + for i, entry_a in enumerate(entries): + if entry_a.id in removed_ids: + continue + for entry_b in entries[i + 1:]: + if entry_b.id in removed_ids: + continue + + # Determine if they are duplicates + reason: Optional[str] = None + score: float = 0.0 + + if ( + entry_a.content_hash is not None + and entry_b.content_hash is not None + and entry_a.content_hash == entry_b.content_hash + ): + reason = "exact_hash" + score = 1.0 + else: + sim = self.linker.compute_similarity(entry_a, entry_b) + if sim >= threshold: + reason = "semantic_similarity" + score = sim + + if reason is None: + continue + + # Decide which entry to keep (older survives) + if entry_a.created_at <= entry_b.created_at: + kept, removed = entry_a, entry_b + else: + kept, removed = entry_b, entry_a + + merges.append({ + "kept": kept.id, + "removed": removed.id, + "reason": reason, + "score": round(score, 4), + "dry_run": dry_run, + }) + + if not dry_run: + # Merge topics (case-deduped) + existing_lower = {t.lower() for t in kept.topics} + for tag in removed.topics: + if tag.lower() not in existing_lower: + kept.topics.append(tag) + existing_lower.add(tag.lower()) + + # Merge metadata (kept wins on key conflicts) + for k, v in removed.metadata.items(): + if k not in kept.metadata: + kept.metadata[k] = v + + # Transfer links: add removed's links to kept + kept_links_set = set(kept.links) + for lid in removed.links: + if lid != kept.id and lid not in kept_links_set and lid not in removed_ids: + kept.links.append(lid) + kept_links_set.add(lid) + # Update the other entry's back-link + other = self._entries.get(lid) + if other and kept.id not in other.links: + other.links.append(kept.id) + + # Remove back-links pointing at the removed entry + for other in self._entries.values(): + if removed.id in other.links: + other.links.remove(removed.id) + if other.id != kept.id and kept.id not in other.links: + other.links.append(kept.id) + + del self._entries[removed.id] + removed_ids.add(removed.id) + + if not dry_run and merges: + self._save() + + return merges + def rebuild_links(self, threshold: Optional[float] = None) -> int: """Recompute all links from scratch. diff --git a/nexus/mnemosyne/cli.py b/nexus/mnemosyne/cli.py index af1823d0..66feb067 100644 --- a/nexus/mnemosyne/cli.py +++ b/nexus/mnemosyne/cli.py @@ -206,6 +206,23 @@ def cmd_timeline(args): print() +def cmd_consolidate(args): + archive = MnemosyneArchive() + merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run) + if not merges: + print("No duplicates found.") + return + label = "[DRY RUN] " if args.dry_run else "" + for m in merges: + print(f"{label}Merge ({m['reason']}, score={m['score']:.4f}):") + print(f" kept: {m['kept'][:8]}") + print(f" removed: {m['removed'][:8]}") + if args.dry_run: + print(f"\n{len(merges)} pair(s) would be merged. Re-run without --dry-run to apply.") + else: + print(f"\nMerged {len(merges)} duplicate pair(s).") + + def cmd_neighbors(args): archive = MnemosyneArchive() try: @@ -283,6 +300,10 @@ def main(): nb.add_argument("entry_id", help="Anchor entry ID") nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)") + co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries") + co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying") + co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)") + args = parser.parse_args() if not args.command: parser.print_help() @@ -305,6 +326,7 @@ def main(): "retag": cmd_retag, "timeline": cmd_timeline, "neighbors": cmd_neighbors, + "consolidate": cmd_consolidate, } dispatch[args.command](args) diff --git a/nexus/mnemosyne/tests/test_consolidation.py b/nexus/mnemosyne/tests/test_consolidation.py new file mode 100644 index 00000000..67a33295 --- /dev/null +++ b/nexus/mnemosyne/tests/test_consolidation.py @@ -0,0 +1,176 @@ +"""Tests for MnemosyneArchive.consolidate() — duplicate/near-duplicate merging.""" + +import tempfile +from pathlib import Path + +from nexus.mnemosyne.archive import MnemosyneArchive +from nexus.mnemosyne.entry import ArchiveEntry +from nexus.mnemosyne.ingest import ingest_event + + +def _archive(tmp: str) -> MnemosyneArchive: + return MnemosyneArchive(archive_path=Path(tmp) / "archive.json", auto_embed=False) + + +def test_consolidate_exact_duplicate_removed(): + """Two entries with identical content_hash are merged; only one survives.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + e1 = ingest_event(archive, title="Hello world", content="Exactly the same content", topics=["a"]) + # Manually add a second entry with the same hash to simulate a duplicate + e2 = ArchiveEntry(title="Hello world", content="Exactly the same content", topics=["b"]) + # Bypass dedup guard so we can test consolidate() rather than add() + archive._entries[e2.id] = e2 + archive._save() + + assert archive.count == 2 + merges = archive.consolidate(dry_run=False) + assert len(merges) == 1 + assert merges[0]["reason"] == "exact_hash" + assert merges[0]["score"] == 1.0 + assert archive.count == 1 + + +def test_consolidate_keeps_older_entry(): + """The older entry (earlier created_at) is kept, the newer is removed.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + e1 = ingest_event(archive, title="Hello world", content="Same content here", topics=[]) + e2 = ArchiveEntry(title="Hello world", content="Same content here", topics=[]) + # Make e2 clearly newer + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e2.id] = e2 + archive._save() + + merges = archive.consolidate(dry_run=False) + assert len(merges) == 1 + assert merges[0]["kept"] == e1.id + assert merges[0]["removed"] == e2.id + + +def test_consolidate_merges_topics(): + """Topics from the removed entry are merged (unioned) into the kept entry.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + e1 = ingest_event(archive, title="Memory item", content="Shared content body", topics=["alpha"]) + e2 = ArchiveEntry(title="Memory item", content="Shared content body", topics=["beta", "gamma"]) + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e2.id] = e2 + archive._save() + + archive.consolidate(dry_run=False) + survivor = archive.get(e1.id) + assert survivor is not None + topic_lower = {t.lower() for t in survivor.topics} + assert "alpha" in topic_lower + assert "beta" in topic_lower + assert "gamma" in topic_lower + + +def test_consolidate_merges_metadata(): + """Metadata from the removed entry is merged into the kept entry; kept values win.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + e1 = ArchiveEntry( + title="Shared", content="Identical body here", topics=[], metadata={"k1": "v1", "shared": "kept"} + ) + archive._entries[e1.id] = e1 + e2 = ArchiveEntry( + title="Shared", content="Identical body here", topics=[], metadata={"k2": "v2", "shared": "removed"} + ) + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e2.id] = e2 + archive._save() + + archive.consolidate(dry_run=False) + survivor = archive.get(e1.id) + assert survivor.metadata["k1"] == "v1" + assert survivor.metadata["k2"] == "v2" + assert survivor.metadata["shared"] == "kept" # kept entry wins + + +def test_consolidate_dry_run_no_mutation(): + """Dry-run mode returns merge plan but does not alter the archive.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + ingest_event(archive, title="Same", content="Identical content to dedup", topics=[]) + e2 = ArchiveEntry(title="Same", content="Identical content to dedup", topics=[]) + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e2.id] = e2 + archive._save() + + merges = archive.consolidate(dry_run=True) + assert len(merges) == 1 + assert merges[0]["dry_run"] is True + # Archive must be unchanged + assert archive.count == 2 + + +def test_consolidate_no_duplicates(): + """When no duplicates exist, consolidate returns an empty list.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + ingest_event(archive, title="Unique A", content="This is completely unique content for A") + ingest_event(archive, title="Unique B", content="Totally different words here for B") + merges = archive.consolidate(threshold=0.9) + assert merges == [] + + +def test_consolidate_transfers_links(): + """Links from the removed entry are inherited by the kept entry.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + # Create a third entry to act as a link target + target = ingest_event(archive, title="Target", content="The link target entry", topics=[]) + + e1 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[], links=[target.id]) + archive._entries[e1.id] = e1 + target.links.append(e1.id) + + e2 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[]) + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e2.id] = e2 + archive._save() + + archive.consolidate(dry_run=False) + survivor = archive.get(e1.id) + assert survivor is not None + assert target.id in survivor.links + + +def test_consolidate_near_duplicate_semantic(): + """Near-duplicate entries above the similarity threshold are merged.""" + with tempfile.TemporaryDirectory() as tmp: + archive = _archive(tmp) + # Entries with very high Jaccard overlap + text_a = "python automation scripting building tools workflows" + text_b = "python automation scripting building tools workflows tasks" + e1 = ArchiveEntry(title="Automator", content=text_a, topics=[]) + e2 = ArchiveEntry(title="Automator", content=text_b, topics=[]) + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e1.id] = e1 + archive._entries[e2.id] = e2 + archive._save() + + # Use a low threshold to ensure these very similar entries match + merges = archive.consolidate(threshold=0.7, dry_run=False) + assert len(merges) >= 1 + assert merges[0]["reason"] == "semantic_similarity" + + +def test_consolidate_persists_after_reload(): + """After consolidation, the reduced archive survives a save/reload cycle.""" + with tempfile.TemporaryDirectory() as tmp: + path = Path(tmp) / "archive.json" + archive = MnemosyneArchive(archive_path=path, auto_embed=False) + ingest_event(archive, title="Persist test", content="Body to dedup and persist", topics=[]) + e2 = ArchiveEntry(title="Persist test", content="Body to dedup and persist", topics=[]) + e2.created_at = "2099-01-01T00:00:00+00:00" + archive._entries[e2.id] = e2 + archive._save() + + archive.consolidate(dry_run=False) + assert archive.count == 1 + + reloaded = MnemosyneArchive(archive_path=path, auto_embed=False) + assert reloaded.count == 1