feat(mnemosyne): entry update, content dedup, find_duplicates #1240

Closed
Rockachopa wants to merge 5 commits from feat/mnemosyne-entry-update-dedup into main
12 changed files with 303 additions and 3 deletions

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@ mempalace/__pycache__/
# Prevent agents from writing to wrong path (see issue #1145)
public/nexus/
test-screenshots/
__pycache__/

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -49,8 +49,22 @@ class MnemosyneArchive:
with open(self.path, "w") as f:
json.dump(data, f, indent=2)
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries."""
def add(self, entry: ArchiveEntry, auto_link: bool = True, skip_dups: bool = False) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries.
Args:
entry: The entry to add.
auto_link: Whether to automatically compute holographic links.
skip_dups: If True, return existing entry instead of adding a duplicate
(same title+content hash).
Returns:
The added (or existing, if skip_dups=True and duplicate found) entry.
"""
if skip_dups:
existing = self.find_by_hash(entry.content_hash)
if existing:
return existing
self._entries[entry.id] = entry
if auto_link:
self.linker.apply_links(entry, list(self._entries.values()))
@@ -581,6 +595,83 @@ class MnemosyneArchive:
self._save()
return entry
def update_entry(
self,
entry_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
metadata: Optional[dict] = None,
re_link: bool = True,
) -> ArchiveEntry:
"""Update fields on an existing entry.
Only provided fields are changed. Bumps updated_at and optionally
recomputes holographic links (since content changed).
Args:
entry_id: ID of the entry to update.
title: New title (None = keep existing).
content: New content (None = keep existing).
metadata: New metadata dict (None = keep existing, {} to clear).
re_link: Whether to recompute holographic links after update.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
old_hash = entry.content_hash
if title is not None:
entry.title = title
if content is not None:
entry.content = content
if metadata is not None:
entry.metadata = metadata
entry.touch()
# Re-link only if content actually changed
if re_link and entry.content_hash != old_hash:
# Clear existing links to this entry from others
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
entry.links = []
# Re-apply
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def find_by_hash(self, content_hash: str) -> Optional[ArchiveEntry]:
"""Find an entry by its content hash (title + content SHA-256).
Returns the first match, or None if no entry has this hash.
"""
for entry in self._entries.values():
if entry.content_hash == content_hash:
return entry
return None
def find_duplicates(self) -> list[list[ArchiveEntry]]:
"""Find groups of entries with identical content hashes.
Returns a list of groups, where each group is a list of 2+ entries
sharing the same title+content. Sorted by group size descending.
"""
hash_groups: dict[str, list[ArchiveEntry]] = {}
for entry in self._entries.values():
h = entry.content_hash
hash_groups.setdefault(h, []).append(entry)
dups = [group for group in hash_groups.values() if len(group) > 1]
dups.sort(key=lambda g: len(g), reverse=True)
return dups
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.

View File

@@ -6,6 +6,7 @@ with metadata, content, and links to related entries.
from __future__ import annotations
import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
@@ -24,8 +25,19 @@ class ArchiveEntry:
topics: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
updated_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
links: list[str] = field(default_factory=list) # IDs of related entries
@property
def content_hash(self) -> str:
"""SHA-256 hash of title + content for dedup detection."""
raw = f"{self.title}\x00{self.content}".encode()
return hashlib.sha256(raw).hexdigest()
def touch(self):
"""Bump updated_at to now."""
self.updated_at = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> dict:
return {
"id": self.id,
@@ -36,9 +48,16 @@ class ArchiveEntry:
"topics": self.topics,
"metadata": self.metadata,
"created_at": self.created_at,
"updated_at": self.updated_at,
"links": self.links,
"content_hash": self.content_hash,
}
@classmethod
def from_dict(cls, data: dict) -> ArchiveEntry:
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
# Strip non-field keys (like content_hash which is computed)
filtered = {k: v for k, v in data.items() if k in cls.__dataclass_fields__}
# Backfill updated_at for legacy entries that lack it
if "updated_at" not in filtered:
filtered["updated_at"] = filtered.get("created_at", datetime.now(timezone.utc).isoformat())
return cls(**filtered)

View File

@@ -491,3 +491,192 @@ def test_tag_persistence_across_reload():
fresh = a2.get(e.id)
assert "beta" in fresh.topics
assert "alpha" not in fresh.topics
# --- Entry update + dedup tests ---
def test_content_hash_deterministic():
e1 = ArchiveEntry(title="Test", content="Hello")
e2 = ArchiveEntry(title="Test", content="Hello")
assert e1.content_hash == e2.content_hash
def test_content_hash_differs_on_change():
e = ArchiveEntry(title="Test", content="Hello")
h1 = e.content_hash
e.content = "World"
assert e.content_hash != h1
def test_updated_at_set_on_creation():
e = ArchiveEntry(title="T", content="c")
assert e.updated_at is not None
assert e.updated_at >= e.created_at
def test_touch_updates_timestamp():
import time
e = ArchiveEntry(title="T", content="c")
before = e.updated_at
time.sleep(0.01)
e.touch()
assert e.updated_at >= before
def test_update_entry_title():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e = ingest_event(archive, title="Old", content="content", topics=["x"])
old_hash = e.content_hash
updated = archive.update_entry(e.id, title="New Title")
assert updated.title == "New Title"
assert updated.content == "content"
assert updated.updated_at >= e.created_at
# Content unchanged, so hash should be same (only title changed)
assert updated.content_hash != old_hash # title is in hash
def test_update_entry_content_relinks():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e1 = ingest_event(archive, title="Python", content="Python programming language")
e2 = ingest_event(archive, title="Java", content="Java programming language")
# e1 and e2 should be linked via shared tokens
assert e2.id in e1.links or e1.id in e2.links
# Update e1 to completely different content
archive.update_entry(e1.id, content="Cooking recipes for dinner")
e1_fresh = archive.get(e1.id)
e2_fresh = archive.get(e2.id)
# e1 should have been re-linked (likely unlinked from e2 now)
# e2 should no longer reference e1
assert e1_fresh.content == "Cooking recipes for dinner"
def test_update_entry_metadata():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e = ingest_event(archive, title="T", content="c")
archive.update_entry(e.id, metadata={"key": "value"})
fresh = archive.get(e.id)
assert fresh.metadata == {"key": "value"}
def test_update_entry_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
try:
archive.update_entry("nonexistent", title="X")
assert False, "Expected KeyError"
except KeyError:
pass
def test_update_entry_no_change_no_relink():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e = ingest_event(archive, title="T", content="c", topics=["x"])
orig_links = list(e.links)
# Update only metadata (no content change)
archive.update_entry(e.id, metadata={"k": "v"})
fresh = archive.get(e.id)
assert fresh.links == orig_links
def test_find_by_hash():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e = ingest_event(archive, title="Unique", content="Unique content xyz")
found = archive.find_by_hash(e.content_hash)
assert found is not None
assert found.id == e.id
def test_find_by_hash_miss():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
found = archive.find_by_hash("nonexistent-hash")
assert found is None
def test_find_duplicates():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e1 = ingest_event(archive, title="Same", content="Duplicate content")
# Manually add a second entry with identical title+content
e2 = ArchiveEntry(title="Same", content="Duplicate content", source="manual")
archive._entries[e2.id] = e2
archive._save()
dups = archive.find_duplicates()
assert len(dups) == 1
assert len(dups[0]) == 2
dup_ids = {d.id for d in dups[0]}
assert e1.id in dup_ids
assert e2.id in dup_ids
def test_find_duplicates_none():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
ingest_event(archive, title="A", content="unique a")
ingest_event(archive, title="B", content="unique b")
dups = archive.find_duplicates()
assert dups == []
def test_add_skip_dups():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e1 = ingest_event(archive, title="Test", content="Content here")
# Try to add exact same entry with skip_dups=True
e2 = ArchiveEntry(title="Test", content="Content here")
result = archive.add(e2, skip_dups=True)
assert result.id == e1.id # returned existing, not new
assert archive.count == 1
def test_add_skip_dups_allows_different():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path)
e1 = ingest_event(archive, title="A", content="Content A")
e2 = ArchiveEntry(title="B", content="Content B")
result = archive.add(e2, skip_dups=True)
assert result.id == e2.id # new entry added
assert archive.count == 2
def test_entry_roundtrip_with_updated_at():
e = ArchiveEntry(title="T", content="c", topics=["x"])
d = e.to_dict()
e2 = ArchiveEntry.from_dict(d)
assert e2.updated_at == e.updated_at
assert "content_hash" in d
def test_entry_from_dict_backfills_updated_at():
"""Legacy entries without updated_at should get it from created_at."""
data = {
"id": "test-id",
"title": "Legacy",
"content": "old entry",
"source": "manual",
"source_ref": None,
"topics": [],
"metadata": {},
"created_at": "2025-01-01T00:00:00+00:00",
"links": [],
}
e = ArchiveEntry.from_dict(data)
assert e.updated_at == "2025-01-01T00:00:00+00:00"