This commit was merged in pull request #1262.
This commit is contained in:
@@ -67,7 +67,7 @@ modules:
|
||||
cli:
|
||||
status: shipped
|
||||
files: [cli.py]
|
||||
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors
|
||||
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate
|
||||
|
||||
tests:
|
||||
status: shipped
|
||||
@@ -182,9 +182,12 @@ planned:
|
||||
- "#TBD" # Will be filled when PR is created
|
||||
|
||||
memory_consolidation:
|
||||
status: planned
|
||||
status: shipped
|
||||
files: [archive.py, cli.py, tests/test_consolidation.py]
|
||||
description: >
|
||||
Automatic merging of duplicate/near-duplicate memories
|
||||
using content_hash and semantic similarity. Periodic
|
||||
consolidation pass.
|
||||
priority: low
|
||||
merged_prs:
|
||||
- "#1260"
|
||||
|
||||
@@ -938,6 +938,127 @@ class MnemosyneArchive:
|
||||
"vibrant_count": vibrant_count,
|
||||
}
|
||||
|
||||
def consolidate(
|
||||
self,
|
||||
threshold: float = 0.9,
|
||||
dry_run: bool = False,
|
||||
) -> list[dict]:
|
||||
"""Scan the archive and merge duplicate/near-duplicate entries.
|
||||
|
||||
Two entries are considered duplicates if:
|
||||
- They share the same ``content_hash`` (exact duplicate), or
|
||||
- Their similarity score (via HolographicLinker) exceeds ``threshold``
|
||||
(near-duplicate when an embedding backend is available or Jaccard is
|
||||
high enough at the given threshold).
|
||||
|
||||
Merge strategy:
|
||||
- Keep the *older* entry (earlier ``created_at``).
|
||||
- Union topics from both entries (case-deduped).
|
||||
- Merge metadata from newer into older (older values win on conflicts).
|
||||
- Transfer all links from the newer entry to the older entry.
|
||||
- Delete the newer entry.
|
||||
|
||||
Args:
|
||||
threshold: Similarity threshold for near-duplicate detection (0.0–1.0).
|
||||
Default 0.9 is intentionally conservative.
|
||||
dry_run: If True, return the list of would-be merges without mutating
|
||||
the archive.
|
||||
|
||||
Returns:
|
||||
List of dicts, one per merged pair::
|
||||
|
||||
{
|
||||
"kept": <entry_id of survivor>,
|
||||
"removed": <entry_id of duplicate>,
|
||||
"reason": "exact_hash" | "semantic_similarity",
|
||||
"score": float, # 1.0 for exact hash matches
|
||||
"dry_run": bool,
|
||||
}
|
||||
"""
|
||||
merges: list[dict] = []
|
||||
entries = list(self._entries.values())
|
||||
removed_ids: set[str] = set()
|
||||
|
||||
for i, entry_a in enumerate(entries):
|
||||
if entry_a.id in removed_ids:
|
||||
continue
|
||||
for entry_b in entries[i + 1:]:
|
||||
if entry_b.id in removed_ids:
|
||||
continue
|
||||
|
||||
# Determine if they are duplicates
|
||||
reason: Optional[str] = None
|
||||
score: float = 0.0
|
||||
|
||||
if (
|
||||
entry_a.content_hash is not None
|
||||
and entry_b.content_hash is not None
|
||||
and entry_a.content_hash == entry_b.content_hash
|
||||
):
|
||||
reason = "exact_hash"
|
||||
score = 1.0
|
||||
else:
|
||||
sim = self.linker.compute_similarity(entry_a, entry_b)
|
||||
if sim >= threshold:
|
||||
reason = "semantic_similarity"
|
||||
score = sim
|
||||
|
||||
if reason is None:
|
||||
continue
|
||||
|
||||
# Decide which entry to keep (older survives)
|
||||
if entry_a.created_at <= entry_b.created_at:
|
||||
kept, removed = entry_a, entry_b
|
||||
else:
|
||||
kept, removed = entry_b, entry_a
|
||||
|
||||
merges.append({
|
||||
"kept": kept.id,
|
||||
"removed": removed.id,
|
||||
"reason": reason,
|
||||
"score": round(score, 4),
|
||||
"dry_run": dry_run,
|
||||
})
|
||||
|
||||
if not dry_run:
|
||||
# Merge topics (case-deduped)
|
||||
existing_lower = {t.lower() for t in kept.topics}
|
||||
for tag in removed.topics:
|
||||
if tag.lower() not in existing_lower:
|
||||
kept.topics.append(tag)
|
||||
existing_lower.add(tag.lower())
|
||||
|
||||
# Merge metadata (kept wins on key conflicts)
|
||||
for k, v in removed.metadata.items():
|
||||
if k not in kept.metadata:
|
||||
kept.metadata[k] = v
|
||||
|
||||
# Transfer links: add removed's links to kept
|
||||
kept_links_set = set(kept.links)
|
||||
for lid in removed.links:
|
||||
if lid != kept.id and lid not in kept_links_set and lid not in removed_ids:
|
||||
kept.links.append(lid)
|
||||
kept_links_set.add(lid)
|
||||
# Update the other entry's back-link
|
||||
other = self._entries.get(lid)
|
||||
if other and kept.id not in other.links:
|
||||
other.links.append(kept.id)
|
||||
|
||||
# Remove back-links pointing at the removed entry
|
||||
for other in self._entries.values():
|
||||
if removed.id in other.links:
|
||||
other.links.remove(removed.id)
|
||||
if other.id != kept.id and kept.id not in other.links:
|
||||
other.links.append(kept.id)
|
||||
|
||||
del self._entries[removed.id]
|
||||
removed_ids.add(removed.id)
|
||||
|
||||
if not dry_run and merges:
|
||||
self._save()
|
||||
|
||||
return merges
|
||||
|
||||
def rebuild_links(self, threshold: Optional[float] = None) -> int:
|
||||
"""Recompute all links from scratch.
|
||||
|
||||
|
||||
@@ -206,6 +206,23 @@ def cmd_timeline(args):
|
||||
print()
|
||||
|
||||
|
||||
def cmd_consolidate(args):
|
||||
archive = MnemosyneArchive()
|
||||
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
|
||||
if not merges:
|
||||
print("No duplicates found.")
|
||||
return
|
||||
label = "[DRY RUN] " if args.dry_run else ""
|
||||
for m in merges:
|
||||
print(f"{label}Merge ({m['reason']}, score={m['score']:.4f}):")
|
||||
print(f" kept: {m['kept'][:8]}")
|
||||
print(f" removed: {m['removed'][:8]}")
|
||||
if args.dry_run:
|
||||
print(f"\n{len(merges)} pair(s) would be merged. Re-run without --dry-run to apply.")
|
||||
else:
|
||||
print(f"\nMerged {len(merges)} duplicate pair(s).")
|
||||
|
||||
|
||||
def cmd_neighbors(args):
|
||||
archive = MnemosyneArchive()
|
||||
try:
|
||||
@@ -283,6 +300,10 @@ def main():
|
||||
nb.add_argument("entry_id", help="Anchor entry ID")
|
||||
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
|
||||
|
||||
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
|
||||
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
|
||||
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
|
||||
|
||||
args = parser.parse_args()
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
@@ -305,6 +326,7 @@ def main():
|
||||
"retag": cmd_retag,
|
||||
"timeline": cmd_timeline,
|
||||
"neighbors": cmd_neighbors,
|
||||
"consolidate": cmd_consolidate,
|
||||
}
|
||||
dispatch[args.command](args)
|
||||
|
||||
|
||||
176
nexus/mnemosyne/tests/test_consolidation.py
Normal file
176
nexus/mnemosyne/tests/test_consolidation.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""Tests for MnemosyneArchive.consolidate() — duplicate/near-duplicate merging."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
from nexus.mnemosyne.ingest import ingest_event
|
||||
|
||||
|
||||
def _archive(tmp: str) -> MnemosyneArchive:
|
||||
return MnemosyneArchive(archive_path=Path(tmp) / "archive.json", auto_embed=False)
|
||||
|
||||
|
||||
def test_consolidate_exact_duplicate_removed():
|
||||
"""Two entries with identical content_hash are merged; only one survives."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
e1 = ingest_event(archive, title="Hello world", content="Exactly the same content", topics=["a"])
|
||||
# Manually add a second entry with the same hash to simulate a duplicate
|
||||
e2 = ArchiveEntry(title="Hello world", content="Exactly the same content", topics=["b"])
|
||||
# Bypass dedup guard so we can test consolidate() rather than add()
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
assert archive.count == 2
|
||||
merges = archive.consolidate(dry_run=False)
|
||||
assert len(merges) == 1
|
||||
assert merges[0]["reason"] == "exact_hash"
|
||||
assert merges[0]["score"] == 1.0
|
||||
assert archive.count == 1
|
||||
|
||||
|
||||
def test_consolidate_keeps_older_entry():
|
||||
"""The older entry (earlier created_at) is kept, the newer is removed."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
e1 = ingest_event(archive, title="Hello world", content="Same content here", topics=[])
|
||||
e2 = ArchiveEntry(title="Hello world", content="Same content here", topics=[])
|
||||
# Make e2 clearly newer
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
merges = archive.consolidate(dry_run=False)
|
||||
assert len(merges) == 1
|
||||
assert merges[0]["kept"] == e1.id
|
||||
assert merges[0]["removed"] == e2.id
|
||||
|
||||
|
||||
def test_consolidate_merges_topics():
|
||||
"""Topics from the removed entry are merged (unioned) into the kept entry."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
e1 = ingest_event(archive, title="Memory item", content="Shared content body", topics=["alpha"])
|
||||
e2 = ArchiveEntry(title="Memory item", content="Shared content body", topics=["beta", "gamma"])
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
archive.consolidate(dry_run=False)
|
||||
survivor = archive.get(e1.id)
|
||||
assert survivor is not None
|
||||
topic_lower = {t.lower() for t in survivor.topics}
|
||||
assert "alpha" in topic_lower
|
||||
assert "beta" in topic_lower
|
||||
assert "gamma" in topic_lower
|
||||
|
||||
|
||||
def test_consolidate_merges_metadata():
|
||||
"""Metadata from the removed entry is merged into the kept entry; kept values win."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
e1 = ArchiveEntry(
|
||||
title="Shared", content="Identical body here", topics=[], metadata={"k1": "v1", "shared": "kept"}
|
||||
)
|
||||
archive._entries[e1.id] = e1
|
||||
e2 = ArchiveEntry(
|
||||
title="Shared", content="Identical body here", topics=[], metadata={"k2": "v2", "shared": "removed"}
|
||||
)
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
archive.consolidate(dry_run=False)
|
||||
survivor = archive.get(e1.id)
|
||||
assert survivor.metadata["k1"] == "v1"
|
||||
assert survivor.metadata["k2"] == "v2"
|
||||
assert survivor.metadata["shared"] == "kept" # kept entry wins
|
||||
|
||||
|
||||
def test_consolidate_dry_run_no_mutation():
|
||||
"""Dry-run mode returns merge plan but does not alter the archive."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
ingest_event(archive, title="Same", content="Identical content to dedup", topics=[])
|
||||
e2 = ArchiveEntry(title="Same", content="Identical content to dedup", topics=[])
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
merges = archive.consolidate(dry_run=True)
|
||||
assert len(merges) == 1
|
||||
assert merges[0]["dry_run"] is True
|
||||
# Archive must be unchanged
|
||||
assert archive.count == 2
|
||||
|
||||
|
||||
def test_consolidate_no_duplicates():
|
||||
"""When no duplicates exist, consolidate returns an empty list."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
ingest_event(archive, title="Unique A", content="This is completely unique content for A")
|
||||
ingest_event(archive, title="Unique B", content="Totally different words here for B")
|
||||
merges = archive.consolidate(threshold=0.9)
|
||||
assert merges == []
|
||||
|
||||
|
||||
def test_consolidate_transfers_links():
|
||||
"""Links from the removed entry are inherited by the kept entry."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
# Create a third entry to act as a link target
|
||||
target = ingest_event(archive, title="Target", content="The link target entry", topics=[])
|
||||
|
||||
e1 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[], links=[target.id])
|
||||
archive._entries[e1.id] = e1
|
||||
target.links.append(e1.id)
|
||||
|
||||
e2 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[])
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
archive.consolidate(dry_run=False)
|
||||
survivor = archive.get(e1.id)
|
||||
assert survivor is not None
|
||||
assert target.id in survivor.links
|
||||
|
||||
|
||||
def test_consolidate_near_duplicate_semantic():
|
||||
"""Near-duplicate entries above the similarity threshold are merged."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
archive = _archive(tmp)
|
||||
# Entries with very high Jaccard overlap
|
||||
text_a = "python automation scripting building tools workflows"
|
||||
text_b = "python automation scripting building tools workflows tasks"
|
||||
e1 = ArchiveEntry(title="Automator", content=text_a, topics=[])
|
||||
e2 = ArchiveEntry(title="Automator", content=text_b, topics=[])
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e1.id] = e1
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
# Use a low threshold to ensure these very similar entries match
|
||||
merges = archive.consolidate(threshold=0.7, dry_run=False)
|
||||
assert len(merges) >= 1
|
||||
assert merges[0]["reason"] == "semantic_similarity"
|
||||
|
||||
|
||||
def test_consolidate_persists_after_reload():
|
||||
"""After consolidation, the reduced archive survives a save/reload cycle."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = Path(tmp) / "archive.json"
|
||||
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
|
||||
ingest_event(archive, title="Persist test", content="Body to dedup and persist", topics=[])
|
||||
e2 = ArchiveEntry(title="Persist test", content="Body to dedup and persist", topics=[])
|
||||
e2.created_at = "2099-01-01T00:00:00+00:00"
|
||||
archive._entries[e2.id] = e2
|
||||
archive._save()
|
||||
|
||||
archive.consolidate(dry_run=False)
|
||||
assert archive.count == 1
|
||||
|
||||
reloaded = MnemosyneArchive(archive_path=path, auto_embed=False)
|
||||
assert reloaded.count == 1
|
||||
Reference in New Issue
Block a user