Files
the-nexus/nexus/mnemosyne/archive.py
Claude (Opus 4.6) 909a61702e
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
[claude] Mnemosyne: semantic search via holographic linker similarity (#1223) (#1225)
2026-04-11 20:19:52 +00:00

244 lines
8.8 KiB
Python

"""MnemosyneArchive — core archive class.
The living holographic archive. Stores entries, maintains links,
and provides query interfaces for retrieving connected knowledge.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
_EXPORT_VERSION = "1"
class MnemosyneArchive:
"""The holographic archive — stores and links entries.
Phase 1 uses JSON file storage. Phase 2 will integrate with
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(self, archive_path: Optional[Path] = None):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self.linker = HolographicLinker()
self._entries: dict[str, ArchiveEntry] = {}
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path) as f:
data = json.load(f)
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
except (json.JSONDecodeError, KeyError):
pass # Start fresh on corrupt data
def _save(self):
data = {
"entries": [e.to_dict() for e in self._entries.values()],
"count": len(self._entries),
}
with open(self.path, "w") as f:
json.dump(data, f, indent=2)
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries."""
self._entries[entry.id] = entry
if auto_link:
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def get(self, entry_id: str) -> Optional[ArchiveEntry]:
return self._entries.get(entry_id)
def search(self, query: str, limit: int = 10) -> list[ArchiveEntry]:
"""Simple keyword search across titles and content."""
query_tokens = set(query.lower().split())
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}".lower()
hits = sum(1 for t in query_tokens if t in text)
if hits > 0:
scored.append((hits, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using holographic linker similarity.
Scores each entry by Jaccard similarity between query tokens and entry
tokens, then boosts entries with more inbound links (more "holographic").
Falls back to keyword search if no entries meet the similarity threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum Jaccard similarity to be considered a semantic match.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
# Count inbound links for each entry (how many entries link TO this one)
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
if not entry_tokens:
continue
intersection = query_tokens & entry_tokens
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2 # up to 20% boost
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Graceful fallback to keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
"""Get entries linked to a given entry, up to specified depth."""
visited = set()
frontier = {entry_id}
result = []
for _ in range(depth):
next_frontier = set()
for eid in frontier:
if eid in visited:
continue
visited.add(eid)
entry = self._entries.get(eid)
if entry:
for linked_id in entry.links:
if linked_id not in visited:
linked = self._entries.get(linked_id)
if linked:
result.append(linked)
next_frontier.add(linked_id)
frontier = next_frontier
return result
def by_topic(self, topic: str) -> list[ArchiveEntry]:
"""Get all entries tagged with a topic."""
topic_lower = topic.lower()
return [e for e in self._entries.values() if topic_lower in [t.lower() for t in e.topics]]
def remove(self, entry_id: str) -> bool:
"""Remove an entry and clean up all bidirectional links.
Returns True if the entry existed and was removed, False otherwise.
"""
if entry_id not in self._entries:
return False
# Remove back-links from all other entries
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
del self._entries[entry_id]
self._save()
return True
def export(
self,
query: Optional[str] = None,
topics: Optional[list[str]] = None,
) -> dict:
"""Export a filtered subset of the archive.
Args:
query: keyword filter applied to title + content (case-insensitive)
topics: list of topic tags; entries must match at least one
Returns a JSON-serialisable dict with an ``entries`` list and metadata.
"""
candidates = list(self._entries.values())
if topics:
lower_topics = {t.lower() for t in topics}
candidates = [
e for e in candidates
if any(t.lower() in lower_topics for t in e.topics)
]
if query:
query_tokens = set(query.lower().split())
candidates = [
e for e in candidates
if any(
token in f"{e.title} {e.content} {' '.join(e.topics)}".lower()
for token in query_tokens
)
]
return {
"version": _EXPORT_VERSION,
"filters": {"query": query, "topics": topics},
"count": len(candidates),
"entries": [e.to_dict() for e in candidates],
}
def topic_counts(self) -> dict[str, int]:
"""Return a dict mapping topic name → entry count, sorted by count desc."""
counts: dict[str, int] = {}
for entry in self._entries.values():
for topic in entry.topics:
counts[topic] = counts.get(topic, 0) + 1
return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))
@property
def count(self) -> int:
return len(self._entries)
def stats(self) -> dict:
entries = list(self._entries.values())
total_links = sum(len(e.links) for e in entries)
topics: set[str] = set()
for e in entries:
topics.update(e.topics)
# Orphans: entries with no links at all
orphans = sum(1 for e in entries if len(e.links) == 0)
# Link density: average links per entry (0 when empty)
n = len(entries)
link_density = round(total_links / n, 4) if n else 0.0
# Age distribution
timestamps = sorted(e.created_at for e in entries)
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
return {
"entries": n,
"total_links": total_links,
"unique_topics": len(topics),
"topics": sorted(topics),
"orphans": orphans,
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
}