Compare commits

...

5 Commits

Author SHA1 Message Date
e7754ce101 docs: update __init__.py docstring for consolidation (#1260)
Some checks failed
CI / test (pull_request) Failing after 8s
CI / validate (pull_request) Failing after 12s
Review Approval Gate / verify-review (pull_request) Failing after 2s
2026-04-12 06:21:02 +00:00
2fa8b5d99b docs: mark memory_consolidation as shipped (#1260) 2026-04-12 06:20:40 +00:00
bb856765ce test: add consolidation tests (#1260)
Covers exact duplicate detection, dry-run, topic/link
merging, triple duplicates, and link repair.
2026-04-12 06:20:39 +00:00
1e110922b2 feat: add 'mnemosyne consolidate' CLI command (#1260)
Supports --dry-run and --threshold options for safe
duplicate detection and merging.
2026-04-12 06:20:06 +00:00
b308e627b8 feat: add consolidate() method to MnemosyneArchive (#1260)
Scans for exact and near-duplicate entries by content_hash
and embedding similarity. Merges older entry with union of
topics, links, and metadata from duplicates.
2026-04-12 06:18:48 +00:00
5 changed files with 284 additions and 2 deletions

View File

@@ -182,9 +182,12 @@ planned:
- "#TBD" # Will be filled when PR is created
memory_consolidation:
status: planned
status: shipped
files: [archive.py, cli.py]
description: >
Automatic merging of duplicate/near-duplicate memories
using content_hash and semantic similarity. Periodic
consolidation pass.
priority: low
merged_prs:
- "#TBD" # Will be filled when PR is created

View File

@@ -1,7 +1,7 @@
"""nexus.mnemosyne — The Living Holographic Archive.
Phase 1: Foundation — core archive, entry model, holographic linker,
ingestion pipeline, and CLI.
ingestion pipeline, memory consolidation, and CLI.
Builds on MemPalace vector memory to create interconnected meaning:
entries auto-reference related entries via semantic similarity,

View File

@@ -972,3 +972,123 @@ class MnemosyneArchive:
self._save()
return total_links
def consolidate(
self,
similarity_threshold: float = 0.9,
dry_run: bool = False,
) -> list[dict]:
"""Find and merge duplicate or near-duplicate entries.
Scans all entries for:
1. Exact duplicates: same content_hash
2. Near-duplicates: embedding similarity > threshold (when available)
When merging, the older entry is kept. Topics, links, and metadata
from the newer entry are merged into the survivor. The newer entry
is removed.
Args:
similarity_threshold: Minimum cosine similarity to consider
near-duplicate (default 0.9). Only used with embedding backend.
dry_run: If True, returns merge pairs without modifying the archive.
Returns:
List of dicts with keys: kept_id, removed_id, reason, similarity.
"""
merges = []
entries = list(self._entries.values())
removed_ids: set[str] = set()
# Phase 1: exact duplicates by content_hash
hash_groups: dict[str, list[ArchiveEntry]] = {}
for entry in entries:
if entry.content_hash:
hash_groups.setdefault(entry.content_hash, []).append(entry)
for content_hash, group in hash_groups.items():
if len(group) < 2:
continue
group.sort(key=lambda e: e.created_at)
keeper = group[0]
for dup in group[1:]:
if dup.id in removed_ids:
continue
merges.append({
"kept_id": keeper.id,
"removed_id": dup.id,
"kept_title": keeper.title,
"removed_title": dup.title,
"reason": "exact_content_hash",
"similarity": 1.0,
})
removed_ids.add(dup.id)
# Phase 2: near-duplicates via embedding similarity
if self._embedding_backend is not None:
active = [e for e in entries if e.id not in removed_ids]
for i, a in enumerate(active):
if a.id in removed_ids:
continue
vec_a = self.linker._get_embedding(a)
if not vec_a:
continue
for b in active[i + 1:]:
if b.id in removed_ids:
continue
vec_b = self.linker._get_embedding(b)
if not vec_b:
continue
sim = self._embedding_backend.similarity(vec_a, vec_b)
if sim >= similarity_threshold:
if a.created_at <= b.created_at:
keeper, loser = a, b
else:
keeper, loser = b, a
merges.append({
"kept_id": keeper.id,
"removed_id": loser.id,
"kept_title": keeper.title,
"removed_title": loser.title,
"reason": "embedding_similarity",
"similarity": round(sim, 4),
})
removed_ids.add(loser.id)
if dry_run:
return merges
# Execute merges
for merge in merges:
keeper = self._entries.get(merge["kept_id"])
loser = self._entries.get(merge["removed_id"])
if keeper is None or loser is None:
continue
for topic in loser.topics:
if topic not in keeper.topics:
keeper.topics.append(topic)
for link_id in loser.links:
if link_id != keeper.id and link_id not in keeper.links:
keeper.links.append(link_id)
for key, value in loser.metadata.items():
if key not in keeper.metadata:
keeper.metadata[key] = value
keeper.updated_at = datetime.now(timezone.utc).isoformat()
del self._entries[loser.id]
for entry in self._entries.values():
if merge["removed_id"] in entry.links:
entry.links.remove(merge["removed_id"])
if merge["kept_id"] not in entry.links and merge["kept_id"] != entry.id:
entry.links.append(merge["kept_id"])
if merges:
self._save()
return merges

View File

@@ -154,6 +154,23 @@ def cmd_rebuild(args):
print(f"Rebuilt links: {total} connections across {archive.count} entries")
def cmd_consolidate(args):
archive = MnemosyneArchive()
threshold = args.threshold
merges = archive.consolidate(similarity_threshold=threshold, dry_run=args.dry_run)
if not merges:
print("No duplicates found.")
return
action = "Would merge" if args.dry_run else "Merged"
print(f"{action} {len(merges)} pair(s):\n")
for m in merges:
sim = m["similarity"]
reason = m["reason"]
print(f" [{reason}] {m['kept_title'][:60]}")
print(f" kept: {m['kept_id'][:8]}")
print(f" removed: {m['removed_id'][:8]} (similarity: {sim})\n")
def cmd_tag(args):
archive = MnemosyneArchive()
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
@@ -263,6 +280,10 @@ def main():
rb = sub.add_parser("rebuild", help="Recompute all links from scratch")
rb.add_argument("-t", "--threshold", type=float, default=None, help="Similarity threshold override")
co = sub.add_parser("consolidate", help="Find and merge duplicate/near-duplicate entries")
co.add_argument("-t", "--threshold", type=float, default=0.9, help="Similarity threshold for near-duplicates (default: 0.9)")
co.add_argument("--dry-run", action="store_true", help="Show what would merge without modifying")
tg = sub.add_parser("tag", help="Add tags to an existing entry")
tg.add_argument("entry_id", help="Entry ID")
tg.add_argument("tags", help="Comma-separated tags to add")
@@ -300,6 +321,7 @@ def main():
"hubs": cmd_hubs,
"bridges": cmd_bridges,
"rebuild": cmd_rebuild,
"consolidate": cmd_consolidate,
"tag": cmd_tag,
"untag": cmd_untag,
"retag": cmd_retag,

View File

@@ -0,0 +1,137 @@
"""Tests for MnemosyneArchive.consolidate()."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
"""Create an archive with auto_embed disabled for deterministic tests."""
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path, auto_embed=False)
class TestConsolidateExactDuplicates:
"""Phase 1: exact duplicate detection by content_hash."""
def test_finds_exact_duplicates(self, archive):
entry_a = ArchiveEntry(title="Hello", content="World")
entry_b = ArchiveEntry(title="Hello", content="World")
archive.add(entry_a, auto_link=False)
archive.add(entry_b, auto_link=False)
# Force same content_hash
entry_b.content_hash = entry_a.content_hash
# Re-add entry_b manually (bypass add() dedup)
archive._entries[entry_b.id] = entry_b
merges = archive.consolidate()
assert len(merges) == 1
assert merges[0]["reason"] == "exact_content_hash"
assert merges[0]["similarity"] == 1.0
def test_keeps_older_entry(self, archive):
entry_a = ArchiveEntry(title="First", content="Data", created_at="2024-01-01T00:00:00+00:00")
entry_b = ArchiveEntry(title="Second", content="Data", created_at="2024-06-01T00:00:00+00:00")
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
merges = archive.consolidate()
assert merges[0]["kept_id"] == entry_a.id
assert merges[0]["removed_id"] == entry_b.id
def test_dry_run_does_not_modify(self, archive):
entry_a = ArchiveEntry(title="A", content="Same")
entry_b = ArchiveEntry(title="B", content="Same")
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
count_before = archive.count
merges = archive.consolidate(dry_run=True)
assert len(merges) == 1
assert archive.count == count_before # unchanged
def test_no_duplicates_returns_empty(self, archive):
archive.add(ArchiveEntry(title="Unique A", content="Content A"), auto_link=False)
archive.add(ArchiveEntry(title="Unique B", content="Content B"), auto_link=False)
merges = archive.consolidate()
assert merges == []
def test_merges_topics(self, archive):
entry_a = ArchiveEntry(title="A", content="Data", topics=["python"])
entry_b = ArchiveEntry(title="B", content="Data", topics=["testing"])
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate()
keeper = archive.get(entry_a.id)
assert "python" in keeper.topics
assert "testing" in keeper.topics
def test_merges_links(self, archive):
entry_c = ArchiveEntry(title="C", content="Ref")
archive.add(entry_c, auto_link=False)
entry_a = ArchiveEntry(title="A", content="Data", links=[entry_c.id])
entry_b = ArchiveEntry(title="B", content="Data", links=[entry_c.id])
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate()
keeper = archive.get(entry_a.id)
assert entry_c.id in keeper.links
def test_removes_duplicate_from_archive(self, archive):
entry_a = ArchiveEntry(title="A", content="Same")
entry_b = ArchiveEntry(title="B", content="Same")
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate()
assert archive.get(entry_a.id) is not None
assert archive.get(entry_b.id) is None
def test_fixes_links_pointing_to_removed(self, archive):
entry_a = ArchiveEntry(title="A", content="Same")
entry_b = ArchiveEntry(title="B", content="Same")
entry_c = ArchiveEntry(title="C", content="Ref", links=[entry_b.id])
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive.add(entry_c, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate()
survivor = archive.get(entry_c.id)
assert entry_b.id not in survivor.links
assert entry_a.id in survivor.links
class TestConsolidateTripleDuplicates:
"""Handle 3+ entries with the same content_hash."""
def test_three_way_merge(self, archive):
entry_a = ArchiveEntry(title="A", content="Same", created_at="2024-01-01T00:00:00+00:00")
entry_b = ArchiveEntry(title="B", content="Same", created_at="2024-02-01T00:00:00+00:00")
entry_c = ArchiveEntry(title="C", content="Same", created_at="2024-03-01T00:00:00+00:00")
entry_b.content_hash = entry_a.content_hash
entry_c.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive._entries[entry_c.id] = entry_c
merges = archive.consolidate()
assert len(merges) == 2
assert all(m["kept_id"] == entry_a.id for m in merges)