Files
the-nexus/nexus/mnemosyne/archive.py
Alexander Whitestone cc1264140c feat(mnemosyne): implement discover() — serendipitous entry exploration (#1271)
- Added discover() method to archive.py (probabilistic, vitality-weighted)
- Added cmd_discover CLI handler with subparser
- Supports: -n COUNT, -t TOPIC, --vibrant flag
- prefer_fading=True surfaces neglected entries
2026-04-15 21:24:00 -04:00

1445 lines
53 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MnemosyneArchive — core archive class.
The living holographic archive. Stores entries, maintains links,
and provides query interfaces for retrieving connected knowledge.
"""
from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry, _compute_content_hash
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.embeddings import get_embedding_backend, EmbeddingBackend
_EXPORT_VERSION = "1"
class MnemosyneArchive:
"""The holographic archive — stores and links entries.
Phase 1 uses JSON file storage. Phase 2 will integrate with
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(
self,
archive_path: Optional[Path] = None,
embedding_backend: Optional[EmbeddingBackend] = None,
auto_embed: bool = True,
):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self._embedding_backend = embedding_backend
if embedding_backend is None and auto_embed:
try:
self._embedding_backend = get_embedding_backend()
except Exception:
self._embedding_backend = None
self.linker = HolographicLinker(embedding_backend=self._embedding_backend)
self._entries: dict[str, ArchiveEntry] = {}
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path) as f:
data = json.load(f)
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
except (json.JSONDecodeError, KeyError):
pass # Start fresh on corrupt data
def _save(self):
data = {
"entries": [e.to_dict() for e in self._entries.values()],
"count": len(self._entries),
}
with open(self.path, "w") as f:
json.dump(data, f, indent=2)
def find_duplicate(self, entry: ArchiveEntry) -> Optional[ArchiveEntry]:
"""Return an existing entry with the same content hash, or None."""
for existing in self._entries.values():
if existing.content_hash == entry.content_hash and existing.id != entry.id:
return existing
return None
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries.
If an entry with the same content hash already exists, returns the
existing entry without creating a duplicate.
"""
duplicate = self.find_duplicate(entry)
if duplicate is not None:
return duplicate
self._entries[entry.id] = entry
if auto_link:
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def update_entry(
self,
entry_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
metadata: Optional[dict] = None,
auto_link: bool = True,
) -> ArchiveEntry:
"""Update title, content, and/or metadata on an existing entry.
Bumps ``updated_at`` and re-runs auto-linking when content changes.
Args:
entry_id: ID of the entry to update.
title: New title, or None to leave unchanged.
content: New content, or None to leave unchanged.
metadata: Dict to merge into existing metadata (replaces keys present).
auto_link: If True, re-run holographic linker after content change.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
content_changed = False
if title is not None and title != entry.title:
entry.title = title
content_changed = True
if content is not None and content != entry.content:
entry.content = content
content_changed = True
if metadata is not None:
entry.metadata.update(metadata)
if content_changed:
entry.content_hash = _compute_content_hash(entry.title, entry.content)
entry.updated_at = datetime.now(timezone.utc).isoformat()
if content_changed and auto_link:
# Clear old links from this entry and re-run linker
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
entry.links = []
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def get(self, entry_id: str) -> Optional[ArchiveEntry]:
return self._entries.get(entry_id)
def search(self, query: str, limit: int = 10) -> list[ArchiveEntry]:
"""Simple keyword search across titles and content."""
query_tokens = set(query.lower().split())
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}".lower()
hits = sum(1 for t in query_tokens if t in text)
if hits > 0:
scored.append((hits, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using embeddings or holographic linker similarity.
With an embedding backend: cosine similarity between query vector and
entry vectors, boosted by inbound link count.
Without: Jaccard similarity on tokens with link boost.
Falls back to keyword search if nothing meets the threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum similarity score to include in results.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
# Count inbound links for link-boost
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
# Try embedding-based search first
if self._embedding_backend:
query_vec = self._embedding_backend.embed(query)
if query_vec:
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}"
entry_vec = self._embedding_backend.embed(text)
if not entry_vec:
continue
sim = self._embedding_backend.similarity(query_vec, entry_vec)
if sim >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.15
scored.append((sim + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Fallback: Jaccard token similarity
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
if not entry_tokens:
continue
intersection = query_tokens & entry_tokens
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Final fallback: keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
"""Get entries linked to a given entry, up to specified depth."""
visited = set()
frontier = {entry_id}
result = []
for _ in range(depth):
next_frontier = set()
for eid in frontier:
if eid in visited:
continue
visited.add(eid)
entry = self._entries.get(eid)
if entry:
for linked_id in entry.links:
if linked_id not in visited:
linked = self._entries.get(linked_id)
if linked:
result.append(linked)
next_frontier.add(linked_id)
frontier = next_frontier
return result
def by_topic(self, topic: str) -> list[ArchiveEntry]:
"""Get all entries tagged with a topic."""
topic_lower = topic.lower()
return [e for e in self._entries.values() if topic_lower in [t.lower() for t in e.topics]]
def remove(self, entry_id: str) -> bool:
"""Remove an entry and clean up all bidirectional links.
Returns True if the entry existed and was removed, False otherwise.
"""
if entry_id not in self._entries:
return False
# Remove back-links from all other entries
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
del self._entries[entry_id]
self._save()
return True
def export(
self,
query: Optional[str] = None,
topics: Optional[list[str]] = None,
) -> dict:
"""Export a filtered subset of the archive.
Args:
query: keyword filter applied to title + content (case-insensitive)
topics: list of topic tags; entries must match at least one
Returns a JSON-serialisable dict with an ``entries`` list and metadata.
"""
candidates = list(self._entries.values())
if topics:
lower_topics = {t.lower() for t in topics}
candidates = [
e for e in candidates
if any(t.lower() in lower_topics for t in e.topics)
]
if query:
query_tokens = set(query.lower().split())
candidates = [
e for e in candidates
if any(
token in f"{e.title} {e.content} {' '.join(e.topics)}".lower()
for token in query_tokens
)
]
return {
"version": _EXPORT_VERSION,
"filters": {"query": query, "topics": topics},
"count": len(candidates),
"entries": [e.to_dict() for e in candidates],
}
def topic_counts(self) -> dict[str, int]:
"""Return a dict mapping topic name → entry count, sorted by count desc."""
counts: dict[str, int] = {}
for entry in self._entries.values():
for topic in entry.topics:
counts[topic] = counts.get(topic, 0) + 1
return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))
@property
def count(self) -> int:
return len(self._entries)
def graph_data(
self,
topic_filter: Optional[str] = None,
) -> dict:
"""Export the full connection graph for 3D constellation visualization.
Returns a dict with:
- nodes: list of {id, title, topics, source, created_at}
- edges: list of {source, target, weight} from holographic links
Args:
topic_filter: If set, only include entries matching this topic
and edges between them.
"""
entries = list(self._entries.values())
if topic_filter:
topic_lower = topic_filter.lower()
entries = [
e for e in entries
if topic_lower in [t.lower() for t in e.topics]
]
entry_ids = {e.id for e in entries}
nodes = [
{
"id": e.id,
"title": e.title,
"topics": e.topics,
"source": e.source,
"created_at": e.created_at,
}
for e in entries
]
# Build edges from links, dedup (A→B and B→A become one edge)
seen_edges: set[tuple[str, str]] = set()
edges = []
for e in entries:
for linked_id in e.links:
if linked_id not in entry_ids:
continue
pair = (min(e.id, linked_id), max(e.id, linked_id))
if pair in seen_edges:
continue
seen_edges.add(pair)
# Compute weight via linker for live similarity score
linked = self._entries.get(linked_id)
if linked:
weight = self.linker.compute_similarity(e, linked)
edges.append({
"source": pair[0],
"target": pair[1],
"weight": round(weight, 4),
})
return {"nodes": nodes, "edges": edges}
def stats(self) -> dict:
entries = list(self._entries.values())
total_links = sum(len(e.links) for e in entries)
topics: set[str] = set()
for e in entries:
topics.update(e.topics)
# Orphans: entries with no links at all
orphans = sum(1 for e in entries if len(e.links) == 0)
# Link density: average links per entry (0 when empty)
n = len(entries)
link_density = round(total_links / n, 4) if n else 0.0
# Age distribution
timestamps = sorted(e.created_at for e in entries)
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
# Vitality summary
if n > 0:
vitalities = [self._compute_vitality(e) for e in entries]
avg_vitality = round(sum(vitalities) / n, 4)
fading_count = sum(1 for v in vitalities if v < 0.3)
vibrant_count = sum(1 for v in vitalities if v > 0.7)
else:
avg_vitality = 0.0
fading_count = 0
vibrant_count = 0
return {
"entries": n,
"total_links": total_links,
"unique_topics": len(topics),
"topics": sorted(topics),
"orphans": orphans,
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
"avg_vitality": avg_vitality,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def _build_adjacency(self) -> dict[str, set[str]]:
"""Build adjacency dict from entry links. Only includes valid references."""
adj: dict[str, set[str]] = {eid: set() for eid in self._entries}
for eid, entry in self._entries.items():
for linked_id in entry.links:
if linked_id in self._entries and linked_id != eid:
adj[eid].add(linked_id)
adj[linked_id].add(eid)
return adj
def graph_clusters(self, min_size: int = 1) -> list[dict]:
"""Find connected component clusters in the holographic graph.
Uses BFS to discover groups of entries that are reachable from each
other through their links. Returns clusters sorted by size descending.
Args:
min_size: Minimum cluster size to include (filters out isolated entries).
Returns:
List of dicts with keys: cluster_id, size, entries, topics, density
"""
adj = self._build_adjacency()
visited: set[str] = set()
clusters: list[dict] = []
cluster_id = 0
for eid in self._entries:
if eid in visited:
continue
# BFS from this entry
component: list[str] = []
queue = [eid]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
# Single-entry clusters are orphans
if len(component) < min_size:
continue
# Collect topics from cluster entries
cluster_topics: dict[str, int] = {}
internal_edges = 0
for cid in component:
entry = self._entries[cid]
for t in entry.topics:
cluster_topics[t] = cluster_topics.get(t, 0) + 1
internal_edges += len(adj.get(cid, set()))
internal_edges //= 2 # undirected, counted twice
# Density: actual edges / possible edges
n = len(component)
max_edges = n * (n - 1) // 2
density = round(internal_edges / max_edges, 4) if max_edges > 0 else 0.0
# Top topics by frequency
top_topics = sorted(cluster_topics.items(), key=lambda x: x[1], reverse=True)[:5]
clusters.append({
"cluster_id": cluster_id,
"size": n,
"entries": component,
"top_topics": [t for t, _ in top_topics],
"internal_edges": internal_edges,
"density": density,
})
cluster_id += 1
clusters.sort(key=lambda c: c["size"], reverse=True)
return clusters
def hub_entries(self, limit: int = 10) -> list[dict]:
"""Find the most connected entries (highest degree centrality).
These are the "hubs" of the holographic graph — entries that bridge
many topics and attract many links.
Args:
limit: Maximum number of hubs to return.
Returns:
List of dicts with keys: entry, degree, inbound, outbound, topics
"""
adj = self._build_adjacency()
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for lid in entry.links:
if lid in inbound:
inbound[lid] += 1
hubs = []
for eid, entry in self._entries.items():
degree = len(adj.get(eid, set()))
if degree == 0:
continue
hubs.append({
"entry": entry,
"degree": degree,
"inbound": inbound.get(eid, 0),
"outbound": len(entry.links),
"topics": entry.topics,
})
hubs.sort(key=lambda h: h["degree"], reverse=True)
return hubs[:limit]
def bridge_entries(self) -> list[dict]:
"""Find articulation points — entries whose removal would split a cluster.
These are "bridge" entries in the holographic graph. Removing them
disconnects members that were previously reachable through the bridge.
Uses Tarjan's algorithm for finding articulation points.
Returns:
List of dicts with keys: entry, cluster_size, bridges_between
"""
adj = self._build_adjacency()
# Find clusters first
clusters = self.graph_clusters(min_size=3)
if not clusters:
return []
# For each cluster, run Tarjan's algorithm
bridges: list[dict] = []
for cluster in clusters:
members = set(cluster["entries"])
if len(members) < 3:
continue
# Build subgraph adjacency
sub_adj = {eid: adj[eid] & members for eid in members}
# Tarjan's DFS for articulation points
discovery: dict[str, int] = {}
low: dict[str, int] = {}
parent: dict[str, Optional[str]] = {}
ap: set[str] = set()
timer = [0]
def dfs(u: str):
children = 0
discovery[u] = low[u] = timer[0]
timer[0] += 1
for v in sub_adj[u]:
if v not in discovery:
children += 1
parent[v] = u
dfs(v)
low[u] = min(low[u], low[v])
# u is AP if: root with 2+ children, or non-root with low[v] >= disc[u]
if parent.get(u) is None and children > 1:
ap.add(u)
if parent.get(u) is not None and low[v] >= discovery[u]:
ap.add(u)
elif v != parent.get(u):
low[u] = min(low[u], discovery[v])
for eid in members:
if eid not in discovery:
parent[eid] = None
dfs(eid)
# For each articulation point, estimate what it bridges
for ap_id in ap:
ap_entry = self._entries[ap_id]
# Remove it temporarily and count resulting components
temp_adj = {k: v.copy() for k, v in sub_adj.items()}
del temp_adj[ap_id]
for k in temp_adj:
temp_adj[k].discard(ap_id)
# BFS count components after removal
temp_visited: set[str] = set()
component_count = 0
for mid in members:
if mid == ap_id or mid in temp_visited:
continue
component_count += 1
queue = [mid]
while queue:
cur = queue.pop(0)
if cur in temp_visited:
continue
temp_visited.add(cur)
for nb in temp_adj.get(cur, set()):
if nb not in temp_visited:
queue.append(nb)
if component_count > 1:
bridges.append({
"entry": ap_entry,
"cluster_size": cluster["size"],
"components_after_removal": component_count,
"topics": ap_entry.topics,
})
bridges.sort(key=lambda b: b["components_after_removal"], reverse=True)
return bridges
def add_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Add new tags to an existing entry (deduplicates, case-preserving).
Args:
entry_id: ID of the entry to update.
tags: Tags to add. Already-present tags (case-insensitive) are skipped.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
existing_lower = {t.lower() for t in entry.topics}
for tag in tags:
if tag.lower() not in existing_lower:
entry.topics.append(tag)
existing_lower.add(tag.lower())
self._save()
return entry
def remove_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Remove specific tags from an existing entry (case-insensitive match).
Args:
entry_id: ID of the entry to update.
tags: Tags to remove. Tags not present are silently ignored.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
remove_lower = {t.lower() for t in tags}
entry.topics = [t for t in entry.topics if t.lower() not in remove_lower]
self._save()
return entry
def retag(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Replace all tags on an existing entry (deduplicates new list).
Args:
entry_id: ID of the entry to update.
tags: New tag list. Duplicates (case-insensitive) are collapsed.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
seen: set[str] = set()
deduped: list[str] = []
for tag in tags:
if tag.lower() not in seen:
seen.add(tag.lower())
deduped.append(tag)
entry.topics = deduped
self._save()
return entry
@staticmethod
def _parse_dt(dt_str: str) -> datetime:
"""Parse an ISO datetime string. Assumes UTC if no timezone is specified."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def by_date_range(self, start: str, end: str) -> list[ArchiveEntry]:
"""Return entries whose ``created_at`` falls within [start, end] (inclusive).
Args:
start: ISO datetime string for the range start (e.g. "2024-01-01" or
"2024-01-01T00:00:00Z"). Timezone-naive strings are treated as UTC.
end: ISO datetime string for the range end. Timezone-naive strings are
treated as UTC.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
"""
start_dt = self._parse_dt(start)
end_dt = self._parse_dt(end)
results = []
for entry in self._entries.values():
entry_dt = self._parse_dt(entry.created_at)
if start_dt <= entry_dt <= end_dt:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
def temporal_neighbors(self, entry_id: str, window_days: int = 7) -> list[ArchiveEntry]:
"""Return entries created within ``window_days`` of a given entry.
The reference entry itself is excluded from results.
Args:
entry_id: ID of the anchor entry.
window_days: Number of days around the anchor's ``created_at`` to search.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
Raises:
KeyError: If ``entry_id`` does not exist in the archive.
"""
anchor = self._entries.get(entry_id)
if anchor is None:
raise KeyError(entry_id)
anchor_dt = self._parse_dt(anchor.created_at)
delta = timedelta(days=window_days)
window_start = anchor_dt - delta
window_end = anchor_dt + delta
results = []
for entry in self._entries.values():
if entry.id == entry_id:
continue
entry_dt = self._parse_dt(entry.created_at)
if window_start <= entry_dt <= window_end:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
# ─── Memory Decay ─────────────────────────────────────────
# Decay parameters
_DECAY_HALF_LIFE_DAYS: float = 30.0 # Half-life for exponential decay
_TOUCH_BOOST_FACTOR: float = 0.1 # Base boost on access (diminishes as vitality → 1.0)
def touch(self, entry_id: str) -> ArchiveEntry:
"""Record an access to an entry, boosting its vitality.
The boost is ``_TOUCH_BOOST_FACTOR * (1 - current_vitality)`` —
diminishing returns as vitality approaches 1.0 ensures entries
can never exceed 1.0 through touch alone.
Args:
entry_id: ID of the entry to touch.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
now = datetime.now(timezone.utc).isoformat()
# Compute current decayed vitality before boosting
current = self._compute_vitality(entry)
boost = self._TOUCH_BOOST_FACTOR * (1.0 - current)
entry.vitality = min(1.0, current + boost)
entry.last_accessed = now
self._save()
return entry
def _compute_vitality(self, entry: ArchiveEntry) -> float:
"""Compute the current vitality of an entry based on time decay.
Uses exponential decay: ``v = base * 0.5 ^ (hours_since_access / half_life_hours)``
If the entry has never been accessed, uses ``created_at`` as the
reference point. New entries with no access start at full vitality.
Args:
entry: The archive entry.
Returns:
Current vitality as a float in [0.0, 1.0].
"""
if entry.last_accessed is None:
# Never accessed — check age from creation
created = self._parse_dt(entry.created_at)
hours_elapsed = (datetime.now(timezone.utc) - created).total_seconds() / 3600
else:
last = self._parse_dt(entry.last_accessed)
hours_elapsed = (datetime.now(timezone.utc) - last).total_seconds() / 3600
half_life_hours = self._DECAY_HALF_LIFE_DAYS * 24
if hours_elapsed <= 0 or half_life_hours <= 0:
return entry.vitality
decayed = entry.vitality * (0.5 ** (hours_elapsed / half_life_hours))
return max(0.0, min(1.0, decayed))
def get_vitality(self, entry_id: str) -> dict:
"""Get the current vitality status of an entry.
Args:
entry_id: ID of the entry.
Returns:
Dict with keys: entry_id, title, vitality, last_accessed, age_days
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
current_vitality = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
return {
"entry_id": entry.id,
"title": entry.title,
"vitality": round(current_vitality, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
}
def fading(self, limit: int = 10) -> list[dict]:
"""Return entries with the lowest vitality (most neglected).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality ascending (most faded first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"])
return scored[:limit]
def vibrant(self, limit: int = 10) -> list[dict]:
"""Return entries with the highest vitality (most alive).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality descending (most vibrant first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"], reverse=True)
return scored[:limit]
def apply_decay(self) -> dict:
"""Apply time-based decay to all entries and persist.
Recomputes each entry's vitality based on elapsed time since
its last access (or creation if never accessed). Saves the
archive after updating.
Returns:
Dict with keys: total_entries, decayed_count, avg_vitality,
fading_count (entries below 0.3), vibrant_count (entries above 0.7)
"""
decayed = 0
total_vitality = 0.0
fading_count = 0
vibrant_count = 0
for entry in self._entries.values():
old_v = entry.vitality
new_v = self._compute_vitality(entry)
if abs(new_v - old_v) > 1e-6:
entry.vitality = new_v
decayed += 1
total_vitality += entry.vitality
if entry.vitality < 0.3:
fading_count += 1
if entry.vitality > 0.7:
vibrant_count += 1
n = len(self._entries)
self._save()
return {
"total_entries": n,
"decayed_count": decayed,
"avg_vitality": round(total_vitality / n, 4) if n else 0.0,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def consolidate(
self,
threshold: float = 0.9,
dry_run: bool = False,
) -> list[dict]:
"""Scan the archive and merge duplicate/near-duplicate entries.
Two entries are considered duplicates if:
- They share the same ``content_hash`` (exact duplicate), or
- Their similarity score (via HolographicLinker) exceeds ``threshold``
(near-duplicate when an embedding backend is available or Jaccard is
high enough at the given threshold).
Merge strategy:
- Keep the *older* entry (earlier ``created_at``).
- Union topics from both entries (case-deduped).
- Merge metadata from newer into older (older values win on conflicts).
- Transfer all links from the newer entry to the older entry.
- Delete the newer entry.
Args:
threshold: Similarity threshold for near-duplicate detection (0.01.0).
Default 0.9 is intentionally conservative.
dry_run: If True, return the list of would-be merges without mutating
the archive.
Returns:
List of dicts, one per merged pair::
{
"kept": <entry_id of survivor>,
"removed": <entry_id of duplicate>,
"reason": "exact_hash" | "semantic_similarity",
"score": float, # 1.0 for exact hash matches
"dry_run": bool,
}
"""
merges: list[dict] = []
entries = list(self._entries.values())
removed_ids: set[str] = set()
for i, entry_a in enumerate(entries):
if entry_a.id in removed_ids:
continue
for entry_b in entries[i + 1:]:
if entry_b.id in removed_ids:
continue
# Determine if they are duplicates
reason: Optional[str] = None
score: float = 0.0
if (
entry_a.content_hash is not None
and entry_b.content_hash is not None
and entry_a.content_hash == entry_b.content_hash
):
reason = "exact_hash"
score = 1.0
else:
sim = self.linker.compute_similarity(entry_a, entry_b)
if sim >= threshold:
reason = "semantic_similarity"
score = sim
if reason is None:
continue
# Decide which entry to keep (older survives)
if entry_a.created_at <= entry_b.created_at:
kept, removed = entry_a, entry_b
else:
kept, removed = entry_b, entry_a
merges.append({
"kept": kept.id,
"removed": removed.id,
"reason": reason,
"score": round(score, 4),
"dry_run": dry_run,
})
if not dry_run:
# Merge topics (case-deduped)
existing_lower = {t.lower() for t in kept.topics}
for tag in removed.topics:
if tag.lower() not in existing_lower:
kept.topics.append(tag)
existing_lower.add(tag.lower())
# Merge metadata (kept wins on key conflicts)
for k, v in removed.metadata.items():
if k not in kept.metadata:
kept.metadata[k] = v
# Transfer links: add removed's links to kept
kept_links_set = set(kept.links)
for lid in removed.links:
if lid != kept.id and lid not in kept_links_set and lid not in removed_ids:
kept.links.append(lid)
kept_links_set.add(lid)
# Update the other entry's back-link
other = self._entries.get(lid)
if other and kept.id not in other.links:
other.links.append(kept.id)
# Remove back-links pointing at the removed entry
for other in self._entries.values():
if removed.id in other.links:
other.links.remove(removed.id)
if other.id != kept.id and kept.id not in other.links:
other.links.append(kept.id)
del self._entries[removed.id]
removed_ids.add(removed.id)
if not dry_run and merges:
self._save()
return merges
def shortest_path(self, start_id: str, end_id: str) -> list[str] | None:
"""Find shortest path between two entries through the connection graph.
Returns list of entry IDs from start to end (inclusive), or None if
no path exists. Uses BFS for unweighted shortest path.
"""
if start_id == end_id:
return [start_id] if start_id in self._entries else None
if start_id not in self._entries or end_id not in self._entries:
return None
adj = self._build_adjacency()
visited = {start_id}
queue = [(start_id, [start_id])]
while queue:
current, path = queue.pop(0)
for neighbor in adj.get(current, []):
if neighbor == end_id:
return path + [neighbor]
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
def path_explanation(self, path: list[str]) -> list[dict]:
"""Convert a path of entry IDs into human-readable step descriptions.
Returns list of dicts with 'id', 'title', and 'topics' for each step.
"""
steps = []
for entry_id in path:
entry = self._entries.get(entry_id)
if entry:
steps.append({
"id": entry.id,
"title": entry.title,
"topics": entry.topics,
"content_preview": entry.content[:120] + "..." if len(entry.content) > 120 else entry.content,
})
else:
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
return steps
# ─── Snapshot / Backup ────────────────────────────────────
def _snapshot_dir(self) -> Path:
"""Return (and create) the snapshots directory next to the archive."""
d = self.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
@staticmethod
def _snapshot_filename(timestamp: str, label: str) -> str:
"""Build a deterministic snapshot filename."""
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
return f"{timestamp}_{safe_label}.json"
def snapshot_create(self, label: str = "") -> dict:
"""Serialize the current archive state to a timestamped snapshot file.
Args:
label: Human-readable label for the snapshot (optional).
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = self._snapshot_filename(timestamp, label)
snapshot_id = filename[:-5] # strip .json
snap_path = self._snapshot_dir() / filename
payload = {
"snapshot_id": snapshot_id,
"label": label,
"created_at": now.isoformat(),
"entry_count": len(self._entries),
"archive_path": str(self.path),
"entries": [e.to_dict() for e in self._entries.values()],
}
with open(snap_path, "w") as f:
json.dump(payload, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label,
"created_at": payload["created_at"],
"entry_count": payload["entry_count"],
"path": str(snap_path),
}
def snapshot_list(self) -> list[dict]:
"""List available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
"""
snap_dir = self._snapshot_dir()
snapshots = []
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
try:
with open(snap_path) as f:
data = json.load(f)
snapshots.append({
"snapshot_id": data.get("snapshot_id", snap_path.stem),
"label": data.get("label", ""),
"created_at": data.get("created_at", ""),
"entry_count": data.get("entry_count", len(data.get("entries", []))),
"path": str(snap_path),
})
except (json.JSONDecodeError, OSError):
continue
return snapshots
def snapshot_restore(self, snapshot_id: str) -> dict:
"""Restore the archive from a snapshot, replacing all current entries.
Args:
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
Returns:
Dict with keys: snapshot_id, restored_count, previous_count
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
previous_count = len(self._entries)
self._entries = {}
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
self._save()
return {
"snapshot_id": snapshot_id,
"restored_count": len(self._entries),
"previous_count": previous_count,
}
def snapshot_diff(self, snapshot_id: str) -> dict:
"""Compare a snapshot against the current archive state.
Args:
snapshot_id: The snapshot_id to compare against current state.
Returns:
Dict with keys:
- snapshot_id: str
- added: list of {id, title} — in current, not in snapshot
- removed: list of {id, title} — in snapshot, not in current
- modified: list of {id, title, snapshot_hash, current_hash}
- unchanged: int — count of identical entries
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
snap_entries: dict[str, dict] = {}
for entry_data in data.get("entries", []):
snap_entries[entry_data["id"]] = entry_data
current_ids = set(self._entries.keys())
snap_ids = set(snap_entries.keys())
added = []
for eid in current_ids - snap_ids:
e = self._entries[eid]
added.append({"id": e.id, "title": e.title})
removed = []
for eid in snap_ids - current_ids:
snap_e = snap_entries[eid]
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
modified = []
unchanged = 0
for eid in current_ids & snap_ids:
current_hash = self._entries[eid].content_hash
snap_hash = snap_entries[eid].get("content_hash")
if current_hash != snap_hash:
modified.append({
"id": eid,
"title": self._entries[eid].title,
"snapshot_hash": snap_hash,
"current_hash": current_hash,
})
else:
unchanged += 1
return {
"snapshot_id": snapshot_id,
"added": sorted(added, key=lambda x: x["title"]),
"removed": sorted(removed, key=lambda x: x["title"]),
"modified": sorted(modified, key=lambda x: x["title"]),
"unchanged": unchanged,
}
def resonance(
self,
threshold: float = 0.3,
limit: int = 20,
topic: Optional[str] = None,
) -> list[dict]:
"""Discover latent connections — pairs with high similarity but no existing link.
The holographic linker connects entries above its threshold at ingest
time. ``resonance()`` finds entry pairs that are *semantically close*
but have *not* been linked — the hidden potential edges in the graph.
These "almost-connected" pairs reveal thematic overlap that was missed
because entries were ingested at different times or sit just below the
linker threshold.
Args:
threshold: Minimum similarity score to surface a pair (default 0.3).
Pairs already linked are excluded regardless of score.
limit: Maximum number of pairs to return (default 20).
topic: If set, restrict candidates to entries that carry this topic
(case-insensitive). Both entries in a pair must match.
Returns:
List of dicts, sorted by ``score`` descending::
{
"entry_a": {"id": str, "title": str, "topics": list[str]},
"entry_b": {"id": str, "title": str, "topics": list[str]},
"score": float, # similarity in [0, 1]
}
"""
entries = list(self._entries.values())
if topic:
topic_lower = topic.lower()
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
results: list[dict] = []
for i, entry_a in enumerate(entries):
for entry_b in entries[i + 1:]:
# Skip pairs that are already linked
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
continue
score = self.linker.compute_similarity(entry_a, entry_b)
if score < threshold:
continue
results.append({
"entry_a": {
"id": entry_a.id,
"title": entry_a.title,
"topics": entry_a.topics,
},
"entry_b": {
"id": entry_b.id,
"title": entry_b.title,
"topics": entry_b.topics,
},
"score": round(score, 4),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def discover(
self,
count: int = 3,
prefer_fading: bool = True,
topic: Optional[str] = None,
) -> list[ArchiveEntry]:
"""Serendipitous entry discovery weighted by vitality decay.
Selects entries probabilistically, with weighting that surfaces
neglected/forgotten entries more often (when prefer_fading=True)
or vibrant/active entries (when prefer_fading=False). Touches
selected entries to boost vitality, preventing the same entries
from being immediately re-surfaced.
Args:
count: Number of entries to discover (default 3).
prefer_fading: If True (default), weight toward fading entries.
If False, weight toward vibrant entries.
topic: If set, restrict to entries with this topic (case-insensitive).
Returns:
List of ArchiveEntry, up to count entries.
"""
import random
candidates = list(self._entries.values())
if not candidates:
return []
if topic:
topic_lower = topic.lower()
candidates = [e for e in candidates if topic_lower in [t.lower() for t in e.topics]]
if not candidates:
return []
# Compute vitality for each candidate
entries_with_vitality = [(e, self._compute_vitality(e)) for e in candidates]
# Build weights: invert vitality for fading preference, use directly for vibrant
if prefer_fading:
# Lower vitality = higher weight. Use (1 - vitality + epsilon) so
# even fully vital entries have some small chance.
weights = [1.0 - v + 0.01 for _, v in entries_with_vitality]
else:
# Higher vitality = higher weight. Use (vitality + epsilon).
weights = [v + 0.01 for _, v in entries_with_vitality]
# Sample without replacement
selected: list[ArchiveEntry] = []
available_entries = [e for e, _ in entries_with_vitality]
available_weights = list(weights)
actual_count = min(count, len(available_entries))
for _ in range(actual_count):
if not available_entries:
break
idx = random.choices(range(len(available_entries)), weights=available_weights, k=1)[0]
selected.append(available_entries.pop(idx))
available_weights.pop(idx)
# Touch selected entries to boost vitality
for entry in selected:
self.touch(entry.id)
return selected
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
Clears existing links and re-applies the holographic linker to every
entry pair. Useful after bulk ingestion or threshold changes.
Args:
threshold: Override the linker's default similarity threshold.
Returns:
Total number of links created.
"""
if threshold is not None:
old_threshold = self.linker.threshold
self.linker.threshold = threshold
# Clear all links
for entry in self._entries.values():
entry.links = []
entries = list(self._entries.values())
total_links = 0
# Re-link each entry against all others
for entry in entries:
candidates = [e for e in entries if e.id != entry.id]
new_links = self.linker.apply_links(entry, candidates)
total_links += new_links
if threshold is not None:
self.linker.threshold = old_threshold
self._save()
return total_links