Files
the-nexus/nexus/mnemosyne/archive.py
Claude (Opus 4.6) b3939179b9
Some checks failed
Deploy Nexus / deploy (push) Failing after 2s
Staging Verification Gate / verify-staging (push) Failing after 3s
[claude] Add temporal query methods: by_date_range and temporal_neighbors (#1244) (#1246)
2026-04-12 01:03:50 +00:00

750 lines
27 KiB
Python

"""MnemosyneArchive — core archive class.
The living holographic archive. Stores entries, maintains links,
and provides query interfaces for retrieving connected knowledge.
"""
from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry, _compute_content_hash
from nexus.mnemosyne.linker import HolographicLinker
_EXPORT_VERSION = "1"
class MnemosyneArchive:
"""The holographic archive — stores and links entries.
Phase 1 uses JSON file storage. Phase 2 will integrate with
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(self, archive_path: Optional[Path] = None):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self.linker = HolographicLinker()
self._entries: dict[str, ArchiveEntry] = {}
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path) as f:
data = json.load(f)
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
except (json.JSONDecodeError, KeyError):
pass # Start fresh on corrupt data
def _save(self):
data = {
"entries": [e.to_dict() for e in self._entries.values()],
"count": len(self._entries),
}
with open(self.path, "w") as f:
json.dump(data, f, indent=2)
def find_duplicate(self, entry: ArchiveEntry) -> Optional[ArchiveEntry]:
"""Return an existing entry with the same content hash, or None."""
for existing in self._entries.values():
if existing.content_hash == entry.content_hash and existing.id != entry.id:
return existing
return None
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries.
If an entry with the same content hash already exists, returns the
existing entry without creating a duplicate.
"""
duplicate = self.find_duplicate(entry)
if duplicate is not None:
return duplicate
self._entries[entry.id] = entry
if auto_link:
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def update_entry(
self,
entry_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
metadata: Optional[dict] = None,
auto_link: bool = True,
) -> ArchiveEntry:
"""Update title, content, and/or metadata on an existing entry.
Bumps ``updated_at`` and re-runs auto-linking when content changes.
Args:
entry_id: ID of the entry to update.
title: New title, or None to leave unchanged.
content: New content, or None to leave unchanged.
metadata: Dict to merge into existing metadata (replaces keys present).
auto_link: If True, re-run holographic linker after content change.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
content_changed = False
if title is not None and title != entry.title:
entry.title = title
content_changed = True
if content is not None and content != entry.content:
entry.content = content
content_changed = True
if metadata is not None:
entry.metadata.update(metadata)
if content_changed:
entry.content_hash = _compute_content_hash(entry.title, entry.content)
entry.updated_at = datetime.now(timezone.utc).isoformat()
if content_changed and auto_link:
# Clear old links from this entry and re-run linker
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
entry.links = []
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def get(self, entry_id: str) -> Optional[ArchiveEntry]:
return self._entries.get(entry_id)
def search(self, query: str, limit: int = 10) -> list[ArchiveEntry]:
"""Simple keyword search across titles and content."""
query_tokens = set(query.lower().split())
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}".lower()
hits = sum(1 for t in query_tokens if t in text)
if hits > 0:
scored.append((hits, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using holographic linker similarity.
Scores each entry by Jaccard similarity between query tokens and entry
tokens, then boosts entries with more inbound links (more "holographic").
Falls back to keyword search if no entries meet the similarity threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum Jaccard similarity to be considered a semantic match.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
# Count inbound links for each entry (how many entries link TO this one)
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
if not entry_tokens:
continue
intersection = query_tokens & entry_tokens
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2 # up to 20% boost
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Graceful fallback to keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
"""Get entries linked to a given entry, up to specified depth."""
visited = set()
frontier = {entry_id}
result = []
for _ in range(depth):
next_frontier = set()
for eid in frontier:
if eid in visited:
continue
visited.add(eid)
entry = self._entries.get(eid)
if entry:
for linked_id in entry.links:
if linked_id not in visited:
linked = self._entries.get(linked_id)
if linked:
result.append(linked)
next_frontier.add(linked_id)
frontier = next_frontier
return result
def by_topic(self, topic: str) -> list[ArchiveEntry]:
"""Get all entries tagged with a topic."""
topic_lower = topic.lower()
return [e for e in self._entries.values() if topic_lower in [t.lower() for t in e.topics]]
def remove(self, entry_id: str) -> bool:
"""Remove an entry and clean up all bidirectional links.
Returns True if the entry existed and was removed, False otherwise.
"""
if entry_id not in self._entries:
return False
# Remove back-links from all other entries
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
del self._entries[entry_id]
self._save()
return True
def export(
self,
query: Optional[str] = None,
topics: Optional[list[str]] = None,
) -> dict:
"""Export a filtered subset of the archive.
Args:
query: keyword filter applied to title + content (case-insensitive)
topics: list of topic tags; entries must match at least one
Returns a JSON-serialisable dict with an ``entries`` list and metadata.
"""
candidates = list(self._entries.values())
if topics:
lower_topics = {t.lower() for t in topics}
candidates = [
e for e in candidates
if any(t.lower() in lower_topics for t in e.topics)
]
if query:
query_tokens = set(query.lower().split())
candidates = [
e for e in candidates
if any(
token in f"{e.title} {e.content} {' '.join(e.topics)}".lower()
for token in query_tokens
)
]
return {
"version": _EXPORT_VERSION,
"filters": {"query": query, "topics": topics},
"count": len(candidates),
"entries": [e.to_dict() for e in candidates],
}
def topic_counts(self) -> dict[str, int]:
"""Return a dict mapping topic name → entry count, sorted by count desc."""
counts: dict[str, int] = {}
for entry in self._entries.values():
for topic in entry.topics:
counts[topic] = counts.get(topic, 0) + 1
return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))
@property
def count(self) -> int:
return len(self._entries)
def graph_data(
self,
topic_filter: Optional[str] = None,
) -> dict:
"""Export the full connection graph for 3D constellation visualization.
Returns a dict with:
- nodes: list of {id, title, topics, source, created_at}
- edges: list of {source, target, weight} from holographic links
Args:
topic_filter: If set, only include entries matching this topic
and edges between them.
"""
entries = list(self._entries.values())
if topic_filter:
topic_lower = topic_filter.lower()
entries = [
e for e in entries
if topic_lower in [t.lower() for t in e.topics]
]
entry_ids = {e.id for e in entries}
nodes = [
{
"id": e.id,
"title": e.title,
"topics": e.topics,
"source": e.source,
"created_at": e.created_at,
}
for e in entries
]
# Build edges from links, dedup (A→B and B→A become one edge)
seen_edges: set[tuple[str, str]] = set()
edges = []
for e in entries:
for linked_id in e.links:
if linked_id not in entry_ids:
continue
pair = (min(e.id, linked_id), max(e.id, linked_id))
if pair in seen_edges:
continue
seen_edges.add(pair)
# Compute weight via linker for live similarity score
linked = self._entries.get(linked_id)
if linked:
weight = self.linker.compute_similarity(e, linked)
edges.append({
"source": pair[0],
"target": pair[1],
"weight": round(weight, 4),
})
return {"nodes": nodes, "edges": edges}
def stats(self) -> dict:
entries = list(self._entries.values())
total_links = sum(len(e.links) for e in entries)
topics: set[str] = set()
for e in entries:
topics.update(e.topics)
# Orphans: entries with no links at all
orphans = sum(1 for e in entries if len(e.links) == 0)
# Link density: average links per entry (0 when empty)
n = len(entries)
link_density = round(total_links / n, 4) if n else 0.0
# Age distribution
timestamps = sorted(e.created_at for e in entries)
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
return {
"entries": n,
"total_links": total_links,
"unique_topics": len(topics),
"topics": sorted(topics),
"orphans": orphans,
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
}
def _build_adjacency(self) -> dict[str, set[str]]:
"""Build adjacency dict from entry links. Only includes valid references."""
adj: dict[str, set[str]] = {eid: set() for eid in self._entries}
for eid, entry in self._entries.items():
for linked_id in entry.links:
if linked_id in self._entries and linked_id != eid:
adj[eid].add(linked_id)
adj[linked_id].add(eid)
return adj
def graph_clusters(self, min_size: int = 1) -> list[dict]:
"""Find connected component clusters in the holographic graph.
Uses BFS to discover groups of entries that are reachable from each
other through their links. Returns clusters sorted by size descending.
Args:
min_size: Minimum cluster size to include (filters out isolated entries).
Returns:
List of dicts with keys: cluster_id, size, entries, topics, density
"""
adj = self._build_adjacency()
visited: set[str] = set()
clusters: list[dict] = []
cluster_id = 0
for eid in self._entries:
if eid in visited:
continue
# BFS from this entry
component: list[str] = []
queue = [eid]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
# Single-entry clusters are orphans
if len(component) < min_size:
continue
# Collect topics from cluster entries
cluster_topics: dict[str, int] = {}
internal_edges = 0
for cid in component:
entry = self._entries[cid]
for t in entry.topics:
cluster_topics[t] = cluster_topics.get(t, 0) + 1
internal_edges += len(adj.get(cid, set()))
internal_edges //= 2 # undirected, counted twice
# Density: actual edges / possible edges
n = len(component)
max_edges = n * (n - 1) // 2
density = round(internal_edges / max_edges, 4) if max_edges > 0 else 0.0
# Top topics by frequency
top_topics = sorted(cluster_topics.items(), key=lambda x: x[1], reverse=True)[:5]
clusters.append({
"cluster_id": cluster_id,
"size": n,
"entries": component,
"top_topics": [t for t, _ in top_topics],
"internal_edges": internal_edges,
"density": density,
})
cluster_id += 1
clusters.sort(key=lambda c: c["size"], reverse=True)
return clusters
def hub_entries(self, limit: int = 10) -> list[dict]:
"""Find the most connected entries (highest degree centrality).
These are the "hubs" of the holographic graph — entries that bridge
many topics and attract many links.
Args:
limit: Maximum number of hubs to return.
Returns:
List of dicts with keys: entry, degree, inbound, outbound, topics
"""
adj = self._build_adjacency()
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for lid in entry.links:
if lid in inbound:
inbound[lid] += 1
hubs = []
for eid, entry in self._entries.items():
degree = len(adj.get(eid, set()))
if degree == 0:
continue
hubs.append({
"entry": entry,
"degree": degree,
"inbound": inbound.get(eid, 0),
"outbound": len(entry.links),
"topics": entry.topics,
})
hubs.sort(key=lambda h: h["degree"], reverse=True)
return hubs[:limit]
def bridge_entries(self) -> list[dict]:
"""Find articulation points — entries whose removal would split a cluster.
These are "bridge" entries in the holographic graph. Removing them
disconnects members that were previously reachable through the bridge.
Uses Tarjan's algorithm for finding articulation points.
Returns:
List of dicts with keys: entry, cluster_size, bridges_between
"""
adj = self._build_adjacency()
# Find clusters first
clusters = self.graph_clusters(min_size=3)
if not clusters:
return []
# For each cluster, run Tarjan's algorithm
bridges: list[dict] = []
for cluster in clusters:
members = set(cluster["entries"])
if len(members) < 3:
continue
# Build subgraph adjacency
sub_adj = {eid: adj[eid] & members for eid in members}
# Tarjan's DFS for articulation points
discovery: dict[str, int] = {}
low: dict[str, int] = {}
parent: dict[str, Optional[str]] = {}
ap: set[str] = set()
timer = [0]
def dfs(u: str):
children = 0
discovery[u] = low[u] = timer[0]
timer[0] += 1
for v in sub_adj[u]:
if v not in discovery:
children += 1
parent[v] = u
dfs(v)
low[u] = min(low[u], low[v])
# u is AP if: root with 2+ children, or non-root with low[v] >= disc[u]
if parent.get(u) is None and children > 1:
ap.add(u)
if parent.get(u) is not None and low[v] >= discovery[u]:
ap.add(u)
elif v != parent.get(u):
low[u] = min(low[u], discovery[v])
for eid in members:
if eid not in discovery:
parent[eid] = None
dfs(eid)
# For each articulation point, estimate what it bridges
for ap_id in ap:
ap_entry = self._entries[ap_id]
# Remove it temporarily and count resulting components
temp_adj = {k: v.copy() for k, v in sub_adj.items()}
del temp_adj[ap_id]
for k in temp_adj:
temp_adj[k].discard(ap_id)
# BFS count components after removal
temp_visited: set[str] = set()
component_count = 0
for mid in members:
if mid == ap_id or mid in temp_visited:
continue
component_count += 1
queue = [mid]
while queue:
cur = queue.pop(0)
if cur in temp_visited:
continue
temp_visited.add(cur)
for nb in temp_adj.get(cur, set()):
if nb not in temp_visited:
queue.append(nb)
if component_count > 1:
bridges.append({
"entry": ap_entry,
"cluster_size": cluster["size"],
"components_after_removal": component_count,
"topics": ap_entry.topics,
})
bridges.sort(key=lambda b: b["components_after_removal"], reverse=True)
return bridges
def add_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Add new tags to an existing entry (deduplicates, case-preserving).
Args:
entry_id: ID of the entry to update.
tags: Tags to add. Already-present tags (case-insensitive) are skipped.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
existing_lower = {t.lower() for t in entry.topics}
for tag in tags:
if tag.lower() not in existing_lower:
entry.topics.append(tag)
existing_lower.add(tag.lower())
self._save()
return entry
def remove_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Remove specific tags from an existing entry (case-insensitive match).
Args:
entry_id: ID of the entry to update.
tags: Tags to remove. Tags not present are silently ignored.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
remove_lower = {t.lower() for t in tags}
entry.topics = [t for t in entry.topics if t.lower() not in remove_lower]
self._save()
return entry
def retag(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Replace all tags on an existing entry (deduplicates new list).
Args:
entry_id: ID of the entry to update.
tags: New tag list. Duplicates (case-insensitive) are collapsed.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
seen: set[str] = set()
deduped: list[str] = []
for tag in tags:
if tag.lower() not in seen:
seen.add(tag.lower())
deduped.append(tag)
entry.topics = deduped
self._save()
return entry
@staticmethod
def _parse_dt(dt_str: str) -> datetime:
"""Parse an ISO datetime string. Assumes UTC if no timezone is specified."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def by_date_range(self, start: str, end: str) -> list[ArchiveEntry]:
"""Return entries whose ``created_at`` falls within [start, end] (inclusive).
Args:
start: ISO datetime string for the range start (e.g. "2024-01-01" or
"2024-01-01T00:00:00Z"). Timezone-naive strings are treated as UTC.
end: ISO datetime string for the range end. Timezone-naive strings are
treated as UTC.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
"""
start_dt = self._parse_dt(start)
end_dt = self._parse_dt(end)
results = []
for entry in self._entries.values():
entry_dt = self._parse_dt(entry.created_at)
if start_dt <= entry_dt <= end_dt:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
def temporal_neighbors(self, entry_id: str, window_days: int = 7) -> list[ArchiveEntry]:
"""Return entries created within ``window_days`` of a given entry.
The reference entry itself is excluded from results.
Args:
entry_id: ID of the anchor entry.
window_days: Number of days around the anchor's ``created_at`` to search.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
Raises:
KeyError: If ``entry_id`` does not exist in the archive.
"""
anchor = self._entries.get(entry_id)
if anchor is None:
raise KeyError(entry_id)
anchor_dt = self._parse_dt(anchor.created_at)
delta = timedelta(days=window_days)
window_start = anchor_dt - delta
window_end = anchor_dt + delta
results = []
for entry in self._entries.values():
if entry.id == entry_id:
continue
entry_dt = self._parse_dt(entry.created_at)
if window_start <= entry_dt <= window_end:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
Clears existing links and re-applies the holographic linker to every
entry pair. Useful after bulk ingestion or threshold changes.
Args:
threshold: Override the linker's default similarity threshold.
Returns:
Total number of links created.
"""
if threshold is not None:
old_threshold = self.linker.threshold
self.linker.threshold = threshold
# Clear all links
for entry in self._entries.values():
entry.links = []
entries = list(self._entries.values())
total_links = 0
# Re-link each entry against all others
for entry in entries:
candidates = [e for e in entries if e.id != entry.id]
new_links = self.linker.apply_links(entry, candidates)
total_links += new_links
if threshold is not None:
self.linker.threshold = old_threshold
self._save()
return total_links