Files
the-nexus/nexus/mnemosyne/archive.py
Alexander Whitestone 17e714c9d2 feat(mnemosyne): add memory decay system to MnemosyneArchive
Part of #1258.
- touch(entry_id): record access, boost vitality
- get_vitality(entry_id): current vitality status  
- fading(limit): most neglected entries
- vibrant(limit): most alive entries
- apply_decay(): decay all entries, persist
- stats() updated with avg_vitality, fading_count, vibrant_count

Decay: exponential with 30-day half-life.
Touch: 0.1 * (1 - current_vitality) — diminishing returns.
2026-04-12 05:42:37 +00:00

975 lines
35 KiB
Python

"""MnemosyneArchive — core archive class.
The living holographic archive. Stores entries, maintains links,
and provides query interfaces for retrieving connected knowledge.
"""
from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry, _compute_content_hash
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.embeddings import get_embedding_backend, EmbeddingBackend
_EXPORT_VERSION = "1"
class MnemosyneArchive:
"""The holographic archive — stores and links entries.
Phase 1 uses JSON file storage. Phase 2 will integrate with
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(
self,
archive_path: Optional[Path] = None,
embedding_backend: Optional[EmbeddingBackend] = None,
auto_embed: bool = True,
):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self._embedding_backend = embedding_backend
if embedding_backend is None and auto_embed:
try:
self._embedding_backend = get_embedding_backend()
except Exception:
self._embedding_backend = None
self.linker = HolographicLinker(embedding_backend=self._embedding_backend)
self._entries: dict[str, ArchiveEntry] = {}
self._load()
def _load(self):
if self.path.exists():
try:
with open(self.path) as f:
data = json.load(f)
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
except (json.JSONDecodeError, KeyError):
pass # Start fresh on corrupt data
def _save(self):
data = {
"entries": [e.to_dict() for e in self._entries.values()],
"count": len(self._entries),
}
with open(self.path, "w") as f:
json.dump(data, f, indent=2)
def find_duplicate(self, entry: ArchiveEntry) -> Optional[ArchiveEntry]:
"""Return an existing entry with the same content hash, or None."""
for existing in self._entries.values():
if existing.content_hash == entry.content_hash and existing.id != entry.id:
return existing
return None
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
"""Add an entry to the archive. Auto-links to related entries.
If an entry with the same content hash already exists, returns the
existing entry without creating a duplicate.
"""
duplicate = self.find_duplicate(entry)
if duplicate is not None:
return duplicate
self._entries[entry.id] = entry
if auto_link:
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def update_entry(
self,
entry_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
metadata: Optional[dict] = None,
auto_link: bool = True,
) -> ArchiveEntry:
"""Update title, content, and/or metadata on an existing entry.
Bumps ``updated_at`` and re-runs auto-linking when content changes.
Args:
entry_id: ID of the entry to update.
title: New title, or None to leave unchanged.
content: New content, or None to leave unchanged.
metadata: Dict to merge into existing metadata (replaces keys present).
auto_link: If True, re-run holographic linker after content change.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
content_changed = False
if title is not None and title != entry.title:
entry.title = title
content_changed = True
if content is not None and content != entry.content:
entry.content = content
content_changed = True
if metadata is not None:
entry.metadata.update(metadata)
if content_changed:
entry.content_hash = _compute_content_hash(entry.title, entry.content)
entry.updated_at = datetime.now(timezone.utc).isoformat()
if content_changed and auto_link:
# Clear old links from this entry and re-run linker
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
entry.links = []
self.linker.apply_links(entry, list(self._entries.values()))
self._save()
return entry
def get(self, entry_id: str) -> Optional[ArchiveEntry]:
return self._entries.get(entry_id)
def search(self, query: str, limit: int = 10) -> list[ArchiveEntry]:
"""Simple keyword search across titles and content."""
query_tokens = set(query.lower().split())
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}".lower()
hits = sum(1 for t in query_tokens if t in text)
if hits > 0:
scored.append((hits, entry))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using embeddings or holographic linker similarity.
With an embedding backend: cosine similarity between query vector and
entry vectors, boosted by inbound link count.
Without: Jaccard similarity on tokens with link boost.
Falls back to keyword search if nothing meets the threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum similarity score to include in results.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
# Count inbound links for link-boost
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
# Try embedding-based search first
if self._embedding_backend:
query_vec = self._embedding_backend.embed(query)
if query_vec:
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}"
entry_vec = self._embedding_backend.embed(text)
if not entry_vec:
continue
sim = self._embedding_backend.similarity(query_vec, entry_vec)
if sim >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.15
scored.append((sim + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Fallback: Jaccard token similarity
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
if not entry_tokens:
continue
intersection = query_tokens & entry_tokens
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Final fallback: keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
"""Get entries linked to a given entry, up to specified depth."""
visited = set()
frontier = {entry_id}
result = []
for _ in range(depth):
next_frontier = set()
for eid in frontier:
if eid in visited:
continue
visited.add(eid)
entry = self._entries.get(eid)
if entry:
for linked_id in entry.links:
if linked_id not in visited:
linked = self._entries.get(linked_id)
if linked:
result.append(linked)
next_frontier.add(linked_id)
frontier = next_frontier
return result
def by_topic(self, topic: str) -> list[ArchiveEntry]:
"""Get all entries tagged with a topic."""
topic_lower = topic.lower()
return [e for e in self._entries.values() if topic_lower in [t.lower() for t in e.topics]]
def remove(self, entry_id: str) -> bool:
"""Remove an entry and clean up all bidirectional links.
Returns True if the entry existed and was removed, False otherwise.
"""
if entry_id not in self._entries:
return False
# Remove back-links from all other entries
for other in self._entries.values():
if entry_id in other.links:
other.links.remove(entry_id)
del self._entries[entry_id]
self._save()
return True
def export(
self,
query: Optional[str] = None,
topics: Optional[list[str]] = None,
) -> dict:
"""Export a filtered subset of the archive.
Args:
query: keyword filter applied to title + content (case-insensitive)
topics: list of topic tags; entries must match at least one
Returns a JSON-serialisable dict with an ``entries`` list and metadata.
"""
candidates = list(self._entries.values())
if topics:
lower_topics = {t.lower() for t in topics}
candidates = [
e for e in candidates
if any(t.lower() in lower_topics for t in e.topics)
]
if query:
query_tokens = set(query.lower().split())
candidates = [
e for e in candidates
if any(
token in f"{e.title} {e.content} {' '.join(e.topics)}".lower()
for token in query_tokens
)
]
return {
"version": _EXPORT_VERSION,
"filters": {"query": query, "topics": topics},
"count": len(candidates),
"entries": [e.to_dict() for e in candidates],
}
def topic_counts(self) -> dict[str, int]:
"""Return a dict mapping topic name → entry count, sorted by count desc."""
counts: dict[str, int] = {}
for entry in self._entries.values():
for topic in entry.topics:
counts[topic] = counts.get(topic, 0) + 1
return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))
@property
def count(self) -> int:
return len(self._entries)
def graph_data(
self,
topic_filter: Optional[str] = None,
) -> dict:
"""Export the full connection graph for 3D constellation visualization.
Returns a dict with:
- nodes: list of {id, title, topics, source, created_at}
- edges: list of {source, target, weight} from holographic links
Args:
topic_filter: If set, only include entries matching this topic
and edges between them.
"""
entries = list(self._entries.values())
if topic_filter:
topic_lower = topic_filter.lower()
entries = [
e for e in entries
if topic_lower in [t.lower() for t in e.topics]
]
entry_ids = {e.id for e in entries}
nodes = [
{
"id": e.id,
"title": e.title,
"topics": e.topics,
"source": e.source,
"created_at": e.created_at,
}
for e in entries
]
# Build edges from links, dedup (A→B and B→A become one edge)
seen_edges: set[tuple[str, str]] = set()
edges = []
for e in entries:
for linked_id in e.links:
if linked_id not in entry_ids:
continue
pair = (min(e.id, linked_id), max(e.id, linked_id))
if pair in seen_edges:
continue
seen_edges.add(pair)
# Compute weight via linker for live similarity score
linked = self._entries.get(linked_id)
if linked:
weight = self.linker.compute_similarity(e, linked)
edges.append({
"source": pair[0],
"target": pair[1],
"weight": round(weight, 4),
})
return {"nodes": nodes, "edges": edges}
def stats(self) -> dict:
entries = list(self._entries.values())
total_links = sum(len(e.links) for e in entries)
topics: set[str] = set()
for e in entries:
topics.update(e.topics)
# Orphans: entries with no links at all
orphans = sum(1 for e in entries if len(e.links) == 0)
# Link density: average links per entry (0 when empty)
n = len(entries)
link_density = round(total_links / n, 4) if n else 0.0
# Age distribution
timestamps = sorted(e.created_at for e in entries)
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
# Vitality summary
if n > 0:
vitalities = [self._compute_vitality(e) for e in entries]
avg_vitality = round(sum(vitalities) / n, 4)
fading_count = sum(1 for v in vitalities if v < 0.3)
vibrant_count = sum(1 for v in vitalities if v > 0.7)
else:
avg_vitality = 0.0
fading_count = 0
vibrant_count = 0
return {
"entries": n,
"total_links": total_links,
"unique_topics": len(topics),
"topics": sorted(topics),
"orphans": orphans,
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
"avg_vitality": avg_vitality,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def _build_adjacency(self) -> dict[str, set[str]]:
"""Build adjacency dict from entry links. Only includes valid references."""
adj: dict[str, set[str]] = {eid: set() for eid in self._entries}
for eid, entry in self._entries.items():
for linked_id in entry.links:
if linked_id in self._entries and linked_id != eid:
adj[eid].add(linked_id)
adj[linked_id].add(eid)
return adj
def graph_clusters(self, min_size: int = 1) -> list[dict]:
"""Find connected component clusters in the holographic graph.
Uses BFS to discover groups of entries that are reachable from each
other through their links. Returns clusters sorted by size descending.
Args:
min_size: Minimum cluster size to include (filters out isolated entries).
Returns:
List of dicts with keys: cluster_id, size, entries, topics, density
"""
adj = self._build_adjacency()
visited: set[str] = set()
clusters: list[dict] = []
cluster_id = 0
for eid in self._entries:
if eid in visited:
continue
# BFS from this entry
component: list[str] = []
queue = [eid]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
# Single-entry clusters are orphans
if len(component) < min_size:
continue
# Collect topics from cluster entries
cluster_topics: dict[str, int] = {}
internal_edges = 0
for cid in component:
entry = self._entries[cid]
for t in entry.topics:
cluster_topics[t] = cluster_topics.get(t, 0) + 1
internal_edges += len(adj.get(cid, set()))
internal_edges //= 2 # undirected, counted twice
# Density: actual edges / possible edges
n = len(component)
max_edges = n * (n - 1) // 2
density = round(internal_edges / max_edges, 4) if max_edges > 0 else 0.0
# Top topics by frequency
top_topics = sorted(cluster_topics.items(), key=lambda x: x[1], reverse=True)[:5]
clusters.append({
"cluster_id": cluster_id,
"size": n,
"entries": component,
"top_topics": [t for t, _ in top_topics],
"internal_edges": internal_edges,
"density": density,
})
cluster_id += 1
clusters.sort(key=lambda c: c["size"], reverse=True)
return clusters
def hub_entries(self, limit: int = 10) -> list[dict]:
"""Find the most connected entries (highest degree centrality).
These are the "hubs" of the holographic graph — entries that bridge
many topics and attract many links.
Args:
limit: Maximum number of hubs to return.
Returns:
List of dicts with keys: entry, degree, inbound, outbound, topics
"""
adj = self._build_adjacency()
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for lid in entry.links:
if lid in inbound:
inbound[lid] += 1
hubs = []
for eid, entry in self._entries.items():
degree = len(adj.get(eid, set()))
if degree == 0:
continue
hubs.append({
"entry": entry,
"degree": degree,
"inbound": inbound.get(eid, 0),
"outbound": len(entry.links),
"topics": entry.topics,
})
hubs.sort(key=lambda h: h["degree"], reverse=True)
return hubs[:limit]
def bridge_entries(self) -> list[dict]:
"""Find articulation points — entries whose removal would split a cluster.
These are "bridge" entries in the holographic graph. Removing them
disconnects members that were previously reachable through the bridge.
Uses Tarjan's algorithm for finding articulation points.
Returns:
List of dicts with keys: entry, cluster_size, bridges_between
"""
adj = self._build_adjacency()
# Find clusters first
clusters = self.graph_clusters(min_size=3)
if not clusters:
return []
# For each cluster, run Tarjan's algorithm
bridges: list[dict] = []
for cluster in clusters:
members = set(cluster["entries"])
if len(members) < 3:
continue
# Build subgraph adjacency
sub_adj = {eid: adj[eid] & members for eid in members}
# Tarjan's DFS for articulation points
discovery: dict[str, int] = {}
low: dict[str, int] = {}
parent: dict[str, Optional[str]] = {}
ap: set[str] = set()
timer = [0]
def dfs(u: str):
children = 0
discovery[u] = low[u] = timer[0]
timer[0] += 1
for v in sub_adj[u]:
if v not in discovery:
children += 1
parent[v] = u
dfs(v)
low[u] = min(low[u], low[v])
# u is AP if: root with 2+ children, or non-root with low[v] >= disc[u]
if parent.get(u) is None and children > 1:
ap.add(u)
if parent.get(u) is not None and low[v] >= discovery[u]:
ap.add(u)
elif v != parent.get(u):
low[u] = min(low[u], discovery[v])
for eid in members:
if eid not in discovery:
parent[eid] = None
dfs(eid)
# For each articulation point, estimate what it bridges
for ap_id in ap:
ap_entry = self._entries[ap_id]
# Remove it temporarily and count resulting components
temp_adj = {k: v.copy() for k, v in sub_adj.items()}
del temp_adj[ap_id]
for k in temp_adj:
temp_adj[k].discard(ap_id)
# BFS count components after removal
temp_visited: set[str] = set()
component_count = 0
for mid in members:
if mid == ap_id or mid in temp_visited:
continue
component_count += 1
queue = [mid]
while queue:
cur = queue.pop(0)
if cur in temp_visited:
continue
temp_visited.add(cur)
for nb in temp_adj.get(cur, set()):
if nb not in temp_visited:
queue.append(nb)
if component_count > 1:
bridges.append({
"entry": ap_entry,
"cluster_size": cluster["size"],
"components_after_removal": component_count,
"topics": ap_entry.topics,
})
bridges.sort(key=lambda b: b["components_after_removal"], reverse=True)
return bridges
def add_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Add new tags to an existing entry (deduplicates, case-preserving).
Args:
entry_id: ID of the entry to update.
tags: Tags to add. Already-present tags (case-insensitive) are skipped.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
existing_lower = {t.lower() for t in entry.topics}
for tag in tags:
if tag.lower() not in existing_lower:
entry.topics.append(tag)
existing_lower.add(tag.lower())
self._save()
return entry
def remove_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Remove specific tags from an existing entry (case-insensitive match).
Args:
entry_id: ID of the entry to update.
tags: Tags to remove. Tags not present are silently ignored.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
remove_lower = {t.lower() for t in tags}
entry.topics = [t for t in entry.topics if t.lower() not in remove_lower]
self._save()
return entry
def retag(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
"""Replace all tags on an existing entry (deduplicates new list).
Args:
entry_id: ID of the entry to update.
tags: New tag list. Duplicates (case-insensitive) are collapsed.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
seen: set[str] = set()
deduped: list[str] = []
for tag in tags:
if tag.lower() not in seen:
seen.add(tag.lower())
deduped.append(tag)
entry.topics = deduped
self._save()
return entry
@staticmethod
def _parse_dt(dt_str: str) -> datetime:
"""Parse an ISO datetime string. Assumes UTC if no timezone is specified."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def by_date_range(self, start: str, end: str) -> list[ArchiveEntry]:
"""Return entries whose ``created_at`` falls within [start, end] (inclusive).
Args:
start: ISO datetime string for the range start (e.g. "2024-01-01" or
"2024-01-01T00:00:00Z"). Timezone-naive strings are treated as UTC.
end: ISO datetime string for the range end. Timezone-naive strings are
treated as UTC.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
"""
start_dt = self._parse_dt(start)
end_dt = self._parse_dt(end)
results = []
for entry in self._entries.values():
entry_dt = self._parse_dt(entry.created_at)
if start_dt <= entry_dt <= end_dt:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
def temporal_neighbors(self, entry_id: str, window_days: int = 7) -> list[ArchiveEntry]:
"""Return entries created within ``window_days`` of a given entry.
The reference entry itself is excluded from results.
Args:
entry_id: ID of the anchor entry.
window_days: Number of days around the anchor's ``created_at`` to search.
Returns:
List of ArchiveEntry sorted by ``created_at`` ascending.
Raises:
KeyError: If ``entry_id`` does not exist in the archive.
"""
anchor = self._entries.get(entry_id)
if anchor is None:
raise KeyError(entry_id)
anchor_dt = self._parse_dt(anchor.created_at)
delta = timedelta(days=window_days)
window_start = anchor_dt - delta
window_end = anchor_dt + delta
results = []
for entry in self._entries.values():
if entry.id == entry_id:
continue
entry_dt = self._parse_dt(entry.created_at)
if window_start <= entry_dt <= window_end:
results.append(entry)
results.sort(key=lambda e: e.created_at)
return results
# ─── Memory Decay ─────────────────────────────────────────
# Decay parameters
_DECAY_HALF_LIFE_DAYS: float = 30.0 # Half-life for exponential decay
_TOUCH_BOOST_FACTOR: float = 0.1 # Base boost on access (diminishes as vitality → 1.0)
def touch(self, entry_id: str) -> ArchiveEntry:
"""Record an access to an entry, boosting its vitality.
The boost is ``_TOUCH_BOOST_FACTOR * (1 - current_vitality)`` —
diminishing returns as vitality approaches 1.0 ensures entries
can never exceed 1.0 through touch alone.
Args:
entry_id: ID of the entry to touch.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
now = datetime.now(timezone.utc).isoformat()
# Compute current decayed vitality before boosting
current = self._compute_vitality(entry)
boost = self._TOUCH_BOOST_FACTOR * (1.0 - current)
entry.vitality = min(1.0, current + boost)
entry.last_accessed = now
self._save()
return entry
def _compute_vitality(self, entry: ArchiveEntry) -> float:
"""Compute the current vitality of an entry based on time decay.
Uses exponential decay: ``v = base * 0.5 ^ (hours_since_access / half_life_hours)``
If the entry has never been accessed, uses ``created_at`` as the
reference point. New entries with no access start at full vitality.
Args:
entry: The archive entry.
Returns:
Current vitality as a float in [0.0, 1.0].
"""
if entry.last_accessed is None:
# Never accessed — check age from creation
created = self._parse_dt(entry.created_at)
hours_elapsed = (datetime.now(timezone.utc) - created).total_seconds() / 3600
else:
last = self._parse_dt(entry.last_accessed)
hours_elapsed = (datetime.now(timezone.utc) - last).total_seconds() / 3600
half_life_hours = self._DECAY_HALF_LIFE_DAYS * 24
if hours_elapsed <= 0 or half_life_hours <= 0:
return entry.vitality
decayed = entry.vitality * (0.5 ** (hours_elapsed / half_life_hours))
return max(0.0, min(1.0, decayed))
def get_vitality(self, entry_id: str) -> dict:
"""Get the current vitality status of an entry.
Args:
entry_id: ID of the entry.
Returns:
Dict with keys: entry_id, title, vitality, last_accessed, age_days
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
current_vitality = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
return {
"entry_id": entry.id,
"title": entry.title,
"vitality": round(current_vitality, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
}
def fading(self, limit: int = 10) -> list[dict]:
"""Return entries with the lowest vitality (most neglected).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality ascending (most faded first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"])
return scored[:limit]
def vibrant(self, limit: int = 10) -> list[dict]:
"""Return entries with the highest vitality (most alive).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality descending (most vibrant first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"], reverse=True)
return scored[:limit]
def apply_decay(self) -> dict:
"""Apply time-based decay to all entries and persist.
Recomputes each entry's vitality based on elapsed time since
its last access (or creation if never accessed). Saves the
archive after updating.
Returns:
Dict with keys: total_entries, decayed_count, avg_vitality,
fading_count (entries below 0.3), vibrant_count (entries above 0.7)
"""
decayed = 0
total_vitality = 0.0
fading_count = 0
vibrant_count = 0
for entry in self._entries.values():
old_v = entry.vitality
new_v = self._compute_vitality(entry)
if abs(new_v - old_v) > 1e-6:
entry.vitality = new_v
decayed += 1
total_vitality += entry.vitality
if entry.vitality < 0.3:
fading_count += 1
if entry.vitality > 0.7:
vibrant_count += 1
n = len(self._entries)
self._save()
return {
"total_entries": n,
"decayed_count": decayed,
"avg_vitality": round(total_vitality / n, 4) if n else 0.0,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
Clears existing links and re-applies the holographic linker to every
entry pair. Useful after bulk ingestion or threshold changes.
Args:
threshold: Override the linker's default similarity threshold.
Returns:
Total number of links created.
"""
if threshold is not None:
old_threshold = self.linker.threshold
self.linker.threshold = threshold
# Clear all links
for entry in self._entries.values():
entry.links = []
entries = list(self._entries.values())
total_links = 0
# Re-link each entry against all others
for entry in entries:
candidates = [e for e in entries if e.id != entry.id]
new_links = self.linker.apply_links(entry, candidates)
total_links += new_links
if threshold is not None:
self.linker.threshold = old_threshold
self._save()
return total_links