Files
the-nexus/nexus/mnemosyne/archive.py
Alexander Whitestone 392c73eb03 feat(mnemosyne): graph cluster analysis — clusters, hubs, bridges, rebuild_links
- graph_clusters(): BFS connected component discovery with density + topic analysis
- hub_entries(): degree centrality ranking of most connected entries
- bridge_entries(): Tarjan's articulation points — entries that connect clusters
- rebuild_links(): full link recomputation after bulk ingestion
- _build_adjacency(): internal adjacency builder with validation
2026-04-11 18:42:32 -04:00

488 lines
18 KiB
Python

"""MnemosyneArchive — core archive class.
The living holographic archive. Stores entries, maintains links,
and provides query interfaces for retrieving connected knowledge.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
_EXPORT_VERSION = "1"
class MnemosyneArchive:
"""The holographic archive — stores and links entries.
Phase 1 uses JSON file storage. Phase 2 will integrate with
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(self, archive_path: Optional[Path] = None):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self.linker = HolographicLinker()
self._entries: dict[str, ArchiveEntry] = {}
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path) as f:
data = json.load(f)
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
except (json.JSONDecodeError, KeyError):
pass # Start fresh on corrupt data
def _save(self):
data = {
"entries": [e.to_dict() for e in self._entries.values()],
"count": len(self._entries),
}
with open(self.path, "w") as f:
json.dump(data, f, indent=2)
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries."""
self._entries[entry.id] = entry
if auto_link:
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def get(self, entry_id: str) -> Optional[ArchiveEntry]:
return self._entries.get(entry_id)
def search(self, query: str, limit: int = 10) -> list[ArchiveEntry]:
"""Simple keyword search across titles and content."""
query_tokens = set(query.lower().split())
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}".lower()
hits = sum(1 for t in query_tokens if t in text)
if hits > 0:
scored.append((hits, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using holographic linker similarity.
Scores each entry by Jaccard similarity between query tokens and entry
tokens, then boosts entries with more inbound links (more "holographic").
Falls back to keyword search if no entries meet the similarity threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum Jaccard similarity to be considered a semantic match.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
# Count inbound links for each entry (how many entries link TO this one)
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
if not entry_tokens:
continue
intersection = query_tokens & entry_tokens
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2 # up to 20% boost
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Graceful fallback to keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
"""Get entries linked to a given entry, up to specified depth."""
visited = set()
frontier = {entry_id}
result = []
for _ in range(depth):
next_frontier = set()
for eid in frontier:
if eid in visited:
continue
visited.add(eid)
entry = self._entries.get(eid)
if entry:
for linked_id in entry.links:
if linked_id not in visited:
linked = self._entries.get(linked_id)
if linked:
result.append(linked)
next_frontier.add(linked_id)
frontier = next_frontier
return result
def by_topic(self, topic: str) -> list[ArchiveEntry]:
"""Get all entries tagged with a topic."""
topic_lower = topic.lower()
return [e for e in self._entries.values() if topic_lower in [t.lower() for t in e.topics]]
def remove(self, entry_id: str) -> bool:
"""Remove an entry and clean up all bidirectional links.
Returns True if the entry existed and was removed, False otherwise.
"""
if entry_id not in self._entries:
return False
# Remove back-links from all other entries
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
del self._entries[entry_id]
self._save()
return True
def export(
self,
query: Optional[str] = None,
topics: Optional[list[str]] = None,
) -> dict:
"""Export a filtered subset of the archive.
Args:
query: keyword filter applied to title + content (case-insensitive)
topics: list of topic tags; entries must match at least one
Returns a JSON-serialisable dict with an ``entries`` list and metadata.
"""
candidates = list(self._entries.values())
if topics:
lower_topics = {t.lower() for t in topics}
candidates = [
e for e in candidates
if any(t.lower() in lower_topics for t in e.topics)
]
if query:
query_tokens = set(query.lower().split())
candidates = [
e for e in candidates
if any(
token in f"{e.title} {e.content} {' '.join(e.topics)}".lower()
for token in query_tokens
)
]
return {
"version": _EXPORT_VERSION,
"filters": {"query": query, "topics": topics},
"count": len(candidates),
"entries": [e.to_dict() for e in candidates],
}
def topic_counts(self) -> dict[str, int]:
"""Return a dict mapping topic name → entry count, sorted by count desc."""
counts: dict[str, int] = {}
for entry in self._entries.values():
for topic in entry.topics:
counts[topic] = counts.get(topic, 0) + 1
return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))
@property
def count(self) -> int:
return len(self._entries)
def stats(self) -> dict:
entries = list(self._entries.values())
total_links = sum(len(e.links) for e in entries)
topics: set[str] = set()
for e in entries:
topics.update(e.topics)
# Orphans: entries with no links at all
orphans = sum(1 for e in entries if len(e.links) == 0)
# Link density: average links per entry (0 when empty)
n = len(entries)
link_density = round(total_links / n, 4) if n else 0.0
# Age distribution
timestamps = sorted(e.created_at for e in entries)
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
return {
"entries": n,
"total_links": total_links,
"unique_topics": len(topics),
"topics": sorted(topics),
"orphans": orphans,
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
}
def _build_adjacency(self) -> dict[str, set[str]]:
"""Build adjacency dict from entry links. Only includes valid references."""
adj: dict[str, set[str]] = {eid: set() for eid in self._entries}
for eid, entry in self._entries.items():
for linked_id in entry.links:
if linked_id in self._entries and linked_id != eid:
adj[eid].add(linked_id)
adj[linked_id].add(eid)
return adj
def graph_clusters(self, min_size: int = 1) -> list[dict]:
"""Find connected component clusters in the holographic graph.
Uses BFS to discover groups of entries that are reachable from each
other through their links. Returns clusters sorted by size descending.
Args:
min_size: Minimum cluster size to include (filters out isolated entries).
Returns:
List of dicts with keys: cluster_id, size, entries, topics, density
"""
adj = self._build_adjacency()
visited: set[str] = set()
clusters: list[dict] = []
cluster_id = 0
for eid in self._entries:
if eid in visited:
continue
# BFS from this entry
component: list[str] = []
queue = [eid]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
# Single-entry clusters are orphans
if len(component) < min_size:
continue
# Collect topics from cluster entries
cluster_topics: dict[str, int] = {}
internal_edges = 0
for cid in component:
entry = self._entries[cid]
for t in entry.topics:
cluster_topics[t] = cluster_topics.get(t, 0) + 1
internal_edges += len(adj.get(cid, set()))
internal_edges //= 2 # undirected, counted twice
# Density: actual edges / possible edges
n = len(component)
max_edges = n * (n - 1) // 2
density = round(internal_edges / max_edges, 4) if max_edges > 0 else 0.0
# Top topics by frequency
top_topics = sorted(cluster_topics.items(), key=lambda x: x[1], reverse=True)[:5]
clusters.append({
"cluster_id": cluster_id,
"size": n,
"entries": component,
"top_topics": [t for t, _ in top_topics],
"internal_edges": internal_edges,
"density": density,
})
cluster_id += 1
clusters.sort(key=lambda c: c["size"], reverse=True)
return clusters
def hub_entries(self, limit: int = 10) -> list[dict]:
"""Find the most connected entries (highest degree centrality).
These are the "hubs" of the holographic graph — entries that bridge
many topics and attract many links.
Args:
limit: Maximum number of hubs to return.
Returns:
List of dicts with keys: entry, degree, inbound, outbound, topics
"""
adj = self._build_adjacency()
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for lid in entry.links:
if lid in inbound:
inbound[lid] += 1
hubs = []
for eid, entry in self._entries.items():
degree = len(adj.get(eid, set()))
if degree == 0:
continue
hubs.append({
"entry": entry,
"degree": degree,
"inbound": inbound.get(eid, 0),
"outbound": len(entry.links),
"topics": entry.topics,
})
hubs.sort(key=lambda h: h["degree"], reverse=True)
return hubs[:limit]
def bridge_entries(self) -> list[dict]:
"""Find articulation points — entries whose removal would split a cluster.
These are "bridge" entries in the holographic graph. Removing them
disconnects members that were previously reachable through the bridge.
Uses Tarjan's algorithm for finding articulation points.
Returns:
List of dicts with keys: entry, cluster_size, bridges_between
"""
adj = self._build_adjacency()
# Find clusters first
clusters = self.graph_clusters(min_size=3)
if not clusters:
return []
# For each cluster, run Tarjan's algorithm
bridges: list[dict] = []
for cluster in clusters:
members = set(cluster["entries"])
if len(members) < 3:
continue
# Build subgraph adjacency
sub_adj = {eid: adj[eid] & members for eid in members}
# Tarjan's DFS for articulation points
discovery: dict[str, int] = {}
low: dict[str, int] = {}
parent: dict[str, Optional[str]] = {}
ap: set[str] = set()
timer = [0]
def dfs(u: str):
children = 0
discovery[u] = low[u] = timer[0]
timer[0] += 1
for v in sub_adj[u]:
if v not in discovery:
children += 1
parent[v] = u
dfs(v)
low[u] = min(low[u], low[v])
# u is AP if: root with 2+ children, or non-root with low[v] >= disc[u]
if parent.get(u) is None and children > 1:
ap.add(u)
if parent.get(u) is not None and low[v] >= discovery[u]:
ap.add(u)
elif v != parent.get(u):
low[u] = min(low[u], discovery[v])
for eid in members:
if eid not in discovery:
parent[eid] = None
dfs(eid)
# For each articulation point, estimate what it bridges
for ap_id in ap:
ap_entry = self._entries[ap_id]
# Remove it temporarily and count resulting components
temp_adj = {k: v.copy() for k, v in sub_adj.items()}
del temp_adj[ap_id]
for k in temp_adj:
temp_adj[k].discard(ap_id)
# BFS count components after removal
temp_visited: set[str] = set()
component_count = 0
for mid in members:
if mid == ap_id or mid in temp_visited:
continue
component_count += 1
queue = [mid]
while queue:
cur = queue.pop(0)
if cur in temp_visited:
continue
temp_visited.add(cur)
for nb in temp_adj.get(cur, set()):
if nb not in temp_visited:
queue.append(nb)
if component_count > 1:
bridges.append({
"entry": ap_entry,
"cluster_size": cluster["size"],
"components_after_removal": component_count,
"topics": ap_entry.topics,
})
bridges.sort(key=lambda b: b["components_after_removal"], reverse=True)
return bridges
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
Clears existing links and re-applies the holographic linker to every
entry pair. Useful after bulk ingestion or threshold changes.
Args:
threshold: Override the linker's default similarity threshold.
Returns:
Total number of links created.
"""
if threshold is not None:
old_threshold = self.linker.threshold
self.linker.threshold = threshold
# Clear all links
for entry in self._entries.values():
entry.links = []
entries = list(self._entries.values())
total_links = 0
# Re-link each entry against all others
for entry in entries:
candidates = [e for e in entries if e.id != entry.id]
new_links = self.linker.apply_links(entry, candidates)
total_links += new_links
if threshold is not None:
self.linker.threshold = old_threshold
self._save()
return total_links