- Added discover() method to archive.py (probabilistic, vitality-weighted) - Added cmd_discover CLI handler with subparser - Supports: -n COUNT, -t TOPIC, --vibrant flag - prefer_fading=True surfaces neglected entries
1445 lines
53 KiB
Python
1445 lines
53 KiB
Python
"""MnemosyneArchive — core archive class.
|
||
|
||
The living holographic archive. Stores entries, maintains links,
|
||
and provides query interfaces for retrieving connected knowledge.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
from nexus.mnemosyne.entry import ArchiveEntry, _compute_content_hash
|
||
from nexus.mnemosyne.linker import HolographicLinker
|
||
from nexus.mnemosyne.embeddings import get_embedding_backend, EmbeddingBackend
|
||
|
||
_EXPORT_VERSION = "1"
|
||
|
||
|
||
class MnemosyneArchive:
|
||
"""The holographic archive — stores and links entries.
|
||
|
||
Phase 1 uses JSON file storage. Phase 2 will integrate with
|
||
MemPalace (ChromaDB) for vector-semantic search.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
archive_path: Optional[Path] = None,
|
||
embedding_backend: Optional[EmbeddingBackend] = None,
|
||
auto_embed: bool = True,
|
||
):
|
||
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
|
||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||
self._embedding_backend = embedding_backend
|
||
if embedding_backend is None and auto_embed:
|
||
try:
|
||
self._embedding_backend = get_embedding_backend()
|
||
except Exception:
|
||
self._embedding_backend = None
|
||
self.linker = HolographicLinker(embedding_backend=self._embedding_backend)
|
||
self._entries: dict[str, ArchiveEntry] = {}
|
||
self._load()
|
||
|
||
def _load(self):
|
||
if self.path.exists():
|
||
try:
|
||
with open(self.path) as f:
|
||
data = json.load(f)
|
||
for entry_data in data.get("entries", []):
|
||
entry = ArchiveEntry.from_dict(entry_data)
|
||
self._entries[entry.id] = entry
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass # Start fresh on corrupt data
|
||
|
||
def _save(self):
|
||
data = {
|
||
"entries": [e.to_dict() for e in self._entries.values()],
|
||
"count": len(self._entries),
|
||
}
|
||
with open(self.path, "w") as f:
|
||
json.dump(data, f, indent=2)
|
||
|
||
def find_duplicate(self, entry: ArchiveEntry) -> Optional[ArchiveEntry]:
|
||
"""Return an existing entry with the same content hash, or None."""
|
||
for existing in self._entries.values():
|
||
if existing.content_hash == entry.content_hash and existing.id != entry.id:
|
||
return existing
|
||
return None
|
||
|
||
def add(self, entry: ArchiveEntry, auto_link: bool = True) -> ArchiveEntry:
|
||
"""Add an entry to the archive. Auto-links to related entries.
|
||
|
||
If an entry with the same content hash already exists, returns the
|
||
existing entry without creating a duplicate.
|
||
"""
|
||
duplicate = self.find_duplicate(entry)
|
||
if duplicate is not None:
|
||
return duplicate
|
||
self._entries[entry.id] = entry
|
||
if auto_link:
|
||
self.linker.apply_links(entry, list(self._entries.values()))
|
||
self._save()
|
||
return entry
|
||
|
||
def update_entry(
|
||
self,
|
||
entry_id: str,
|
||
title: Optional[str] = None,
|
||
content: Optional[str] = None,
|
||
metadata: Optional[dict] = None,
|
||
auto_link: bool = True,
|
||
) -> ArchiveEntry:
|
||
"""Update title, content, and/or metadata on an existing entry.
|
||
|
||
Bumps ``updated_at`` and re-runs auto-linking when content changes.
|
||
|
||
Args:
|
||
entry_id: ID of the entry to update.
|
||
title: New title, or None to leave unchanged.
|
||
content: New content, or None to leave unchanged.
|
||
metadata: Dict to merge into existing metadata (replaces keys present).
|
||
auto_link: If True, re-run holographic linker after content change.
|
||
|
||
Returns:
|
||
The updated ArchiveEntry.
|
||
|
||
Raises:
|
||
KeyError: If entry_id does not exist.
|
||
"""
|
||
entry = self._entries.get(entry_id)
|
||
if entry is None:
|
||
raise KeyError(entry_id)
|
||
|
||
content_changed = False
|
||
if title is not None and title != entry.title:
|
||
entry.title = title
|
||
content_changed = True
|
||
if content is not None and content != entry.content:
|
||
entry.content = content
|
||
content_changed = True
|
||
if metadata is not None:
|
||
entry.metadata.update(metadata)
|
||
|
||
if content_changed:
|
||
entry.content_hash = _compute_content_hash(entry.title, entry.content)
|
||
|
||
entry.updated_at = datetime.now(timezone.utc).isoformat()
|
||
|
||
if content_changed and auto_link:
|
||
# Clear old links from this entry and re-run linker
|
||
for other in self._entries.values():
|
||
if entry_id in other.links:
|
||
other.links.remove(entry_id)
|
||
entry.links = []
|
||
self.linker.apply_links(entry, list(self._entries.values()))
|
||
|
||
self._save()
|
||
return entry
|
||
|
||
def get(self, entry_id: str) -> Optional[ArchiveEntry]:
|
||
return self._entries.get(entry_id)
|
||
|
||
def search(self, query: str, limit: int = 10) -> list[ArchiveEntry]:
|
||
"""Simple keyword search across titles and content."""
|
||
query_tokens = set(query.lower().split())
|
||
scored = []
|
||
for entry in self._entries.values():
|
||
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}".lower()
|
||
hits = sum(1 for t in query_tokens if t in text)
|
||
if hits > 0:
|
||
scored.append((hits, entry))
|
||
scored.sort(key=lambda x: x[0], reverse=True)
|
||
return [e for _, e in scored[:limit]]
|
||
|
||
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
|
||
"""Semantic search using embeddings or holographic linker similarity.
|
||
|
||
With an embedding backend: cosine similarity between query vector and
|
||
entry vectors, boosted by inbound link count.
|
||
Without: Jaccard similarity on tokens with link boost.
|
||
Falls back to keyword search if nothing meets the threshold.
|
||
|
||
Args:
|
||
query: Natural language query string.
|
||
limit: Maximum number of results to return.
|
||
threshold: Minimum similarity score to include in results.
|
||
|
||
Returns:
|
||
List of ArchiveEntry sorted by combined relevance score, descending.
|
||
"""
|
||
# Count inbound links for link-boost
|
||
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
|
||
for entry in self._entries.values():
|
||
for linked_id in entry.links:
|
||
if linked_id in inbound:
|
||
inbound[linked_id] += 1
|
||
max_inbound = max(inbound.values(), default=1) or 1
|
||
|
||
# Try embedding-based search first
|
||
if self._embedding_backend:
|
||
query_vec = self._embedding_backend.embed(query)
|
||
if query_vec:
|
||
scored = []
|
||
for entry in self._entries.values():
|
||
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}"
|
||
entry_vec = self._embedding_backend.embed(text)
|
||
if not entry_vec:
|
||
continue
|
||
sim = self._embedding_backend.similarity(query_vec, entry_vec)
|
||
if sim >= threshold:
|
||
link_boost = inbound[entry.id] / max_inbound * 0.15
|
||
scored.append((sim + link_boost, entry))
|
||
if scored:
|
||
scored.sort(key=lambda x: x[0], reverse=True)
|
||
return [e for _, e in scored[:limit]]
|
||
|
||
# Fallback: Jaccard token similarity
|
||
query_tokens = HolographicLinker._tokenize(query)
|
||
if not query_tokens:
|
||
return []
|
||
scored = []
|
||
for entry in self._entries.values():
|
||
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
|
||
if not entry_tokens:
|
||
continue
|
||
intersection = query_tokens & entry_tokens
|
||
union = query_tokens | entry_tokens
|
||
jaccard = len(intersection) / len(union)
|
||
if jaccard >= threshold:
|
||
link_boost = inbound[entry.id] / max_inbound * 0.2
|
||
scored.append((jaccard + link_boost, entry))
|
||
if scored:
|
||
scored.sort(key=lambda x: x[0], reverse=True)
|
||
return [e for _, e in scored[:limit]]
|
||
|
||
# Final fallback: keyword search
|
||
return self.search(query, limit=limit)
|
||
|
||
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
|
||
"""Get entries linked to a given entry, up to specified depth."""
|
||
visited = set()
|
||
frontier = {entry_id}
|
||
result = []
|
||
for _ in range(depth):
|
||
next_frontier = set()
|
||
for eid in frontier:
|
||
if eid in visited:
|
||
continue
|
||
visited.add(eid)
|
||
entry = self._entries.get(eid)
|
||
if entry:
|
||
for linked_id in entry.links:
|
||
if linked_id not in visited:
|
||
linked = self._entries.get(linked_id)
|
||
if linked:
|
||
result.append(linked)
|
||
next_frontier.add(linked_id)
|
||
frontier = next_frontier
|
||
return result
|
||
|
||
def by_topic(self, topic: str) -> list[ArchiveEntry]:
|
||
"""Get all entries tagged with a topic."""
|
||
topic_lower = topic.lower()
|
||
return [e for e in self._entries.values() if topic_lower in [t.lower() for t in e.topics]]
|
||
|
||
def remove(self, entry_id: str) -> bool:
|
||
"""Remove an entry and clean up all bidirectional links.
|
||
|
||
Returns True if the entry existed and was removed, False otherwise.
|
||
"""
|
||
if entry_id not in self._entries:
|
||
return False
|
||
# Remove back-links from all other entries
|
||
for other in self._entries.values():
|
||
if entry_id in other.links:
|
||
other.links.remove(entry_id)
|
||
del self._entries[entry_id]
|
||
self._save()
|
||
return True
|
||
|
||
def export(
|
||
self,
|
||
query: Optional[str] = None,
|
||
topics: Optional[list[str]] = None,
|
||
) -> dict:
|
||
"""Export a filtered subset of the archive.
|
||
|
||
Args:
|
||
query: keyword filter applied to title + content (case-insensitive)
|
||
topics: list of topic tags; entries must match at least one
|
||
|
||
Returns a JSON-serialisable dict with an ``entries`` list and metadata.
|
||
"""
|
||
candidates = list(self._entries.values())
|
||
|
||
if topics:
|
||
lower_topics = {t.lower() for t in topics}
|
||
candidates = [
|
||
e for e in candidates
|
||
if any(t.lower() in lower_topics for t in e.topics)
|
||
]
|
||
|
||
if query:
|
||
query_tokens = set(query.lower().split())
|
||
candidates = [
|
||
e for e in candidates
|
||
if any(
|
||
token in f"{e.title} {e.content} {' '.join(e.topics)}".lower()
|
||
for token in query_tokens
|
||
)
|
||
]
|
||
|
||
return {
|
||
"version": _EXPORT_VERSION,
|
||
"filters": {"query": query, "topics": topics},
|
||
"count": len(candidates),
|
||
"entries": [e.to_dict() for e in candidates],
|
||
}
|
||
|
||
def topic_counts(self) -> dict[str, int]:
|
||
"""Return a dict mapping topic name → entry count, sorted by count desc."""
|
||
counts: dict[str, int] = {}
|
||
for entry in self._entries.values():
|
||
for topic in entry.topics:
|
||
counts[topic] = counts.get(topic, 0) + 1
|
||
return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True))
|
||
|
||
@property
|
||
def count(self) -> int:
|
||
return len(self._entries)
|
||
|
||
def graph_data(
|
||
self,
|
||
topic_filter: Optional[str] = None,
|
||
) -> dict:
|
||
"""Export the full connection graph for 3D constellation visualization.
|
||
|
||
Returns a dict with:
|
||
- nodes: list of {id, title, topics, source, created_at}
|
||
- edges: list of {source, target, weight} from holographic links
|
||
|
||
Args:
|
||
topic_filter: If set, only include entries matching this topic
|
||
and edges between them.
|
||
"""
|
||
entries = list(self._entries.values())
|
||
|
||
if topic_filter:
|
||
topic_lower = topic_filter.lower()
|
||
entries = [
|
||
e for e in entries
|
||
if topic_lower in [t.lower() for t in e.topics]
|
||
]
|
||
|
||
entry_ids = {e.id for e in entries}
|
||
|
||
nodes = [
|
||
{
|
||
"id": e.id,
|
||
"title": e.title,
|
||
"topics": e.topics,
|
||
"source": e.source,
|
||
"created_at": e.created_at,
|
||
}
|
||
for e in entries
|
||
]
|
||
|
||
# Build edges from links, dedup (A→B and B→A become one edge)
|
||
seen_edges: set[tuple[str, str]] = set()
|
||
edges = []
|
||
for e in entries:
|
||
for linked_id in e.links:
|
||
if linked_id not in entry_ids:
|
||
continue
|
||
pair = (min(e.id, linked_id), max(e.id, linked_id))
|
||
if pair in seen_edges:
|
||
continue
|
||
seen_edges.add(pair)
|
||
# Compute weight via linker for live similarity score
|
||
linked = self._entries.get(linked_id)
|
||
if linked:
|
||
weight = self.linker.compute_similarity(e, linked)
|
||
edges.append({
|
||
"source": pair[0],
|
||
"target": pair[1],
|
||
"weight": round(weight, 4),
|
||
})
|
||
|
||
return {"nodes": nodes, "edges": edges}
|
||
|
||
def stats(self) -> dict:
|
||
entries = list(self._entries.values())
|
||
total_links = sum(len(e.links) for e in entries)
|
||
topics: set[str] = set()
|
||
for e in entries:
|
||
topics.update(e.topics)
|
||
|
||
# Orphans: entries with no links at all
|
||
orphans = sum(1 for e in entries if len(e.links) == 0)
|
||
|
||
# Link density: average links per entry (0 when empty)
|
||
n = len(entries)
|
||
link_density = round(total_links / n, 4) if n else 0.0
|
||
|
||
# Age distribution
|
||
timestamps = sorted(e.created_at for e in entries)
|
||
oldest_entry = timestamps[0] if timestamps else None
|
||
newest_entry = timestamps[-1] if timestamps else None
|
||
|
||
# Vitality summary
|
||
if n > 0:
|
||
vitalities = [self._compute_vitality(e) for e in entries]
|
||
avg_vitality = round(sum(vitalities) / n, 4)
|
||
fading_count = sum(1 for v in vitalities if v < 0.3)
|
||
vibrant_count = sum(1 for v in vitalities if v > 0.7)
|
||
else:
|
||
avg_vitality = 0.0
|
||
fading_count = 0
|
||
vibrant_count = 0
|
||
|
||
return {
|
||
"entries": n,
|
||
"total_links": total_links,
|
||
"unique_topics": len(topics),
|
||
"topics": sorted(topics),
|
||
"orphans": orphans,
|
||
"link_density": link_density,
|
||
"oldest_entry": oldest_entry,
|
||
"newest_entry": newest_entry,
|
||
"avg_vitality": avg_vitality,
|
||
"fading_count": fading_count,
|
||
"vibrant_count": vibrant_count,
|
||
}
|
||
|
||
def _build_adjacency(self) -> dict[str, set[str]]:
|
||
"""Build adjacency dict from entry links. Only includes valid references."""
|
||
adj: dict[str, set[str]] = {eid: set() for eid in self._entries}
|
||
for eid, entry in self._entries.items():
|
||
for linked_id in entry.links:
|
||
if linked_id in self._entries and linked_id != eid:
|
||
adj[eid].add(linked_id)
|
||
adj[linked_id].add(eid)
|
||
return adj
|
||
|
||
def graph_clusters(self, min_size: int = 1) -> list[dict]:
|
||
"""Find connected component clusters in the holographic graph.
|
||
|
||
Uses BFS to discover groups of entries that are reachable from each
|
||
other through their links. Returns clusters sorted by size descending.
|
||
|
||
Args:
|
||
min_size: Minimum cluster size to include (filters out isolated entries).
|
||
|
||
Returns:
|
||
List of dicts with keys: cluster_id, size, entries, topics, density
|
||
"""
|
||
adj = self._build_adjacency()
|
||
visited: set[str] = set()
|
||
clusters: list[dict] = []
|
||
cluster_id = 0
|
||
|
||
for eid in self._entries:
|
||
if eid in visited:
|
||
continue
|
||
# BFS from this entry
|
||
component: list[str] = []
|
||
queue = [eid]
|
||
while queue:
|
||
current = queue.pop(0)
|
||
if current in visited:
|
||
continue
|
||
visited.add(current)
|
||
component.append(current)
|
||
for neighbor in adj.get(current, set()):
|
||
if neighbor not in visited:
|
||
queue.append(neighbor)
|
||
|
||
# Single-entry clusters are orphans
|
||
if len(component) < min_size:
|
||
continue
|
||
|
||
# Collect topics from cluster entries
|
||
cluster_topics: dict[str, int] = {}
|
||
internal_edges = 0
|
||
for cid in component:
|
||
entry = self._entries[cid]
|
||
for t in entry.topics:
|
||
cluster_topics[t] = cluster_topics.get(t, 0) + 1
|
||
internal_edges += len(adj.get(cid, set()))
|
||
internal_edges //= 2 # undirected, counted twice
|
||
|
||
# Density: actual edges / possible edges
|
||
n = len(component)
|
||
max_edges = n * (n - 1) // 2
|
||
density = round(internal_edges / max_edges, 4) if max_edges > 0 else 0.0
|
||
|
||
# Top topics by frequency
|
||
top_topics = sorted(cluster_topics.items(), key=lambda x: x[1], reverse=True)[:5]
|
||
|
||
clusters.append({
|
||
"cluster_id": cluster_id,
|
||
"size": n,
|
||
"entries": component,
|
||
"top_topics": [t for t, _ in top_topics],
|
||
"internal_edges": internal_edges,
|
||
"density": density,
|
||
})
|
||
cluster_id += 1
|
||
|
||
clusters.sort(key=lambda c: c["size"], reverse=True)
|
||
return clusters
|
||
|
||
def hub_entries(self, limit: int = 10) -> list[dict]:
|
||
"""Find the most connected entries (highest degree centrality).
|
||
|
||
These are the "hubs" of the holographic graph — entries that bridge
|
||
many topics and attract many links.
|
||
|
||
Args:
|
||
limit: Maximum number of hubs to return.
|
||
|
||
Returns:
|
||
List of dicts with keys: entry, degree, inbound, outbound, topics
|
||
"""
|
||
adj = self._build_adjacency()
|
||
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
|
||
|
||
for entry in self._entries.values():
|
||
for lid in entry.links:
|
||
if lid in inbound:
|
||
inbound[lid] += 1
|
||
|
||
hubs = []
|
||
for eid, entry in self._entries.items():
|
||
degree = len(adj.get(eid, set()))
|
||
if degree == 0:
|
||
continue
|
||
hubs.append({
|
||
"entry": entry,
|
||
"degree": degree,
|
||
"inbound": inbound.get(eid, 0),
|
||
"outbound": len(entry.links),
|
||
"topics": entry.topics,
|
||
})
|
||
|
||
hubs.sort(key=lambda h: h["degree"], reverse=True)
|
||
return hubs[:limit]
|
||
|
||
def bridge_entries(self) -> list[dict]:
|
||
"""Find articulation points — entries whose removal would split a cluster.
|
||
|
||
These are "bridge" entries in the holographic graph. Removing them
|
||
disconnects members that were previously reachable through the bridge.
|
||
Uses Tarjan's algorithm for finding articulation points.
|
||
|
||
Returns:
|
||
List of dicts with keys: entry, cluster_size, bridges_between
|
||
"""
|
||
adj = self._build_adjacency()
|
||
|
||
# Find clusters first
|
||
clusters = self.graph_clusters(min_size=3)
|
||
if not clusters:
|
||
return []
|
||
|
||
# For each cluster, run Tarjan's algorithm
|
||
bridges: list[dict] = []
|
||
for cluster in clusters:
|
||
members = set(cluster["entries"])
|
||
if len(members) < 3:
|
||
continue
|
||
|
||
# Build subgraph adjacency
|
||
sub_adj = {eid: adj[eid] & members for eid in members}
|
||
|
||
# Tarjan's DFS for articulation points
|
||
discovery: dict[str, int] = {}
|
||
low: dict[str, int] = {}
|
||
parent: dict[str, Optional[str]] = {}
|
||
ap: set[str] = set()
|
||
timer = [0]
|
||
|
||
def dfs(u: str):
|
||
children = 0
|
||
discovery[u] = low[u] = timer[0]
|
||
timer[0] += 1
|
||
for v in sub_adj[u]:
|
||
if v not in discovery:
|
||
children += 1
|
||
parent[v] = u
|
||
dfs(v)
|
||
low[u] = min(low[u], low[v])
|
||
|
||
# u is AP if: root with 2+ children, or non-root with low[v] >= disc[u]
|
||
if parent.get(u) is None and children > 1:
|
||
ap.add(u)
|
||
if parent.get(u) is not None and low[v] >= discovery[u]:
|
||
ap.add(u)
|
||
elif v != parent.get(u):
|
||
low[u] = min(low[u], discovery[v])
|
||
|
||
for eid in members:
|
||
if eid not in discovery:
|
||
parent[eid] = None
|
||
dfs(eid)
|
||
|
||
# For each articulation point, estimate what it bridges
|
||
for ap_id in ap:
|
||
ap_entry = self._entries[ap_id]
|
||
# Remove it temporarily and count resulting components
|
||
temp_adj = {k: v.copy() for k, v in sub_adj.items()}
|
||
del temp_adj[ap_id]
|
||
for k in temp_adj:
|
||
temp_adj[k].discard(ap_id)
|
||
|
||
# BFS count components after removal
|
||
temp_visited: set[str] = set()
|
||
component_count = 0
|
||
for mid in members:
|
||
if mid == ap_id or mid in temp_visited:
|
||
continue
|
||
component_count += 1
|
||
queue = [mid]
|
||
while queue:
|
||
cur = queue.pop(0)
|
||
if cur in temp_visited:
|
||
continue
|
||
temp_visited.add(cur)
|
||
for nb in temp_adj.get(cur, set()):
|
||
if nb not in temp_visited:
|
||
queue.append(nb)
|
||
|
||
if component_count > 1:
|
||
bridges.append({
|
||
"entry": ap_entry,
|
||
"cluster_size": cluster["size"],
|
||
"components_after_removal": component_count,
|
||
"topics": ap_entry.topics,
|
||
})
|
||
|
||
bridges.sort(key=lambda b: b["components_after_removal"], reverse=True)
|
||
return bridges
|
||
|
||
def add_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
|
||
"""Add new tags to an existing entry (deduplicates, case-preserving).
|
||
|
||
Args:
|
||
entry_id: ID of the entry to update.
|
||
tags: Tags to add. Already-present tags (case-insensitive) are skipped.
|
||
|
||
Returns:
|
||
The updated ArchiveEntry.
|
||
|
||
Raises:
|
||
KeyError: If entry_id does not exist.
|
||
"""
|
||
entry = self._entries.get(entry_id)
|
||
if entry is None:
|
||
raise KeyError(entry_id)
|
||
existing_lower = {t.lower() for t in entry.topics}
|
||
for tag in tags:
|
||
if tag.lower() not in existing_lower:
|
||
entry.topics.append(tag)
|
||
existing_lower.add(tag.lower())
|
||
self._save()
|
||
return entry
|
||
|
||
def remove_tags(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
|
||
"""Remove specific tags from an existing entry (case-insensitive match).
|
||
|
||
Args:
|
||
entry_id: ID of the entry to update.
|
||
tags: Tags to remove. Tags not present are silently ignored.
|
||
|
||
Returns:
|
||
The updated ArchiveEntry.
|
||
|
||
Raises:
|
||
KeyError: If entry_id does not exist.
|
||
"""
|
||
entry = self._entries.get(entry_id)
|
||
if entry is None:
|
||
raise KeyError(entry_id)
|
||
remove_lower = {t.lower() for t in tags}
|
||
entry.topics = [t for t in entry.topics if t.lower() not in remove_lower]
|
||
self._save()
|
||
return entry
|
||
|
||
def retag(self, entry_id: str, tags: list[str]) -> ArchiveEntry:
|
||
"""Replace all tags on an existing entry (deduplicates new list).
|
||
|
||
Args:
|
||
entry_id: ID of the entry to update.
|
||
tags: New tag list. Duplicates (case-insensitive) are collapsed.
|
||
|
||
Returns:
|
||
The updated ArchiveEntry.
|
||
|
||
Raises:
|
||
KeyError: If entry_id does not exist.
|
||
"""
|
||
entry = self._entries.get(entry_id)
|
||
if entry is None:
|
||
raise KeyError(entry_id)
|
||
seen: set[str] = set()
|
||
deduped: list[str] = []
|
||
for tag in tags:
|
||
if tag.lower() not in seen:
|
||
seen.add(tag.lower())
|
||
deduped.append(tag)
|
||
entry.topics = deduped
|
||
self._save()
|
||
return entry
|
||
|
||
@staticmethod
|
||
def _parse_dt(dt_str: str) -> datetime:
|
||
"""Parse an ISO datetime string. Assumes UTC if no timezone is specified."""
|
||
dt = datetime.fromisoformat(dt_str)
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt
|
||
|
||
def by_date_range(self, start: str, end: str) -> list[ArchiveEntry]:
|
||
"""Return entries whose ``created_at`` falls within [start, end] (inclusive).
|
||
|
||
Args:
|
||
start: ISO datetime string for the range start (e.g. "2024-01-01" or
|
||
"2024-01-01T00:00:00Z"). Timezone-naive strings are treated as UTC.
|
||
end: ISO datetime string for the range end. Timezone-naive strings are
|
||
treated as UTC.
|
||
|
||
Returns:
|
||
List of ArchiveEntry sorted by ``created_at`` ascending.
|
||
"""
|
||
start_dt = self._parse_dt(start)
|
||
end_dt = self._parse_dt(end)
|
||
results = []
|
||
for entry in self._entries.values():
|
||
entry_dt = self._parse_dt(entry.created_at)
|
||
if start_dt <= entry_dt <= end_dt:
|
||
results.append(entry)
|
||
results.sort(key=lambda e: e.created_at)
|
||
return results
|
||
|
||
def temporal_neighbors(self, entry_id: str, window_days: int = 7) -> list[ArchiveEntry]:
|
||
"""Return entries created within ``window_days`` of a given entry.
|
||
|
||
The reference entry itself is excluded from results.
|
||
|
||
Args:
|
||
entry_id: ID of the anchor entry.
|
||
window_days: Number of days around the anchor's ``created_at`` to search.
|
||
|
||
Returns:
|
||
List of ArchiveEntry sorted by ``created_at`` ascending.
|
||
|
||
Raises:
|
||
KeyError: If ``entry_id`` does not exist in the archive.
|
||
"""
|
||
anchor = self._entries.get(entry_id)
|
||
if anchor is None:
|
||
raise KeyError(entry_id)
|
||
anchor_dt = self._parse_dt(anchor.created_at)
|
||
delta = timedelta(days=window_days)
|
||
window_start = anchor_dt - delta
|
||
window_end = anchor_dt + delta
|
||
results = []
|
||
for entry in self._entries.values():
|
||
if entry.id == entry_id:
|
||
continue
|
||
entry_dt = self._parse_dt(entry.created_at)
|
||
if window_start <= entry_dt <= window_end:
|
||
results.append(entry)
|
||
results.sort(key=lambda e: e.created_at)
|
||
return results
|
||
|
||
# ─── Memory Decay ─────────────────────────────────────────
|
||
|
||
# Decay parameters
|
||
_DECAY_HALF_LIFE_DAYS: float = 30.0 # Half-life for exponential decay
|
||
_TOUCH_BOOST_FACTOR: float = 0.1 # Base boost on access (diminishes as vitality → 1.0)
|
||
|
||
def touch(self, entry_id: str) -> ArchiveEntry:
|
||
"""Record an access to an entry, boosting its vitality.
|
||
|
||
The boost is ``_TOUCH_BOOST_FACTOR * (1 - current_vitality)`` —
|
||
diminishing returns as vitality approaches 1.0 ensures entries
|
||
can never exceed 1.0 through touch alone.
|
||
|
||
Args:
|
||
entry_id: ID of the entry to touch.
|
||
|
||
Returns:
|
||
The updated ArchiveEntry.
|
||
|
||
Raises:
|
||
KeyError: If entry_id does not exist.
|
||
"""
|
||
entry = self._entries.get(entry_id)
|
||
if entry is None:
|
||
raise KeyError(entry_id)
|
||
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
|
||
# Compute current decayed vitality before boosting
|
||
current = self._compute_vitality(entry)
|
||
boost = self._TOUCH_BOOST_FACTOR * (1.0 - current)
|
||
entry.vitality = min(1.0, current + boost)
|
||
entry.last_accessed = now
|
||
self._save()
|
||
return entry
|
||
|
||
def _compute_vitality(self, entry: ArchiveEntry) -> float:
|
||
"""Compute the current vitality of an entry based on time decay.
|
||
|
||
Uses exponential decay: ``v = base * 0.5 ^ (hours_since_access / half_life_hours)``
|
||
|
||
If the entry has never been accessed, uses ``created_at`` as the
|
||
reference point. New entries with no access start at full vitality.
|
||
|
||
Args:
|
||
entry: The archive entry.
|
||
|
||
Returns:
|
||
Current vitality as a float in [0.0, 1.0].
|
||
"""
|
||
if entry.last_accessed is None:
|
||
# Never accessed — check age from creation
|
||
created = self._parse_dt(entry.created_at)
|
||
hours_elapsed = (datetime.now(timezone.utc) - created).total_seconds() / 3600
|
||
else:
|
||
last = self._parse_dt(entry.last_accessed)
|
||
hours_elapsed = (datetime.now(timezone.utc) - last).total_seconds() / 3600
|
||
|
||
half_life_hours = self._DECAY_HALF_LIFE_DAYS * 24
|
||
if hours_elapsed <= 0 or half_life_hours <= 0:
|
||
return entry.vitality
|
||
|
||
decayed = entry.vitality * (0.5 ** (hours_elapsed / half_life_hours))
|
||
return max(0.0, min(1.0, decayed))
|
||
|
||
def get_vitality(self, entry_id: str) -> dict:
|
||
"""Get the current vitality status of an entry.
|
||
|
||
Args:
|
||
entry_id: ID of the entry.
|
||
|
||
Returns:
|
||
Dict with keys: entry_id, title, vitality, last_accessed, age_days
|
||
|
||
Raises:
|
||
KeyError: If entry_id does not exist.
|
||
"""
|
||
entry = self._entries.get(entry_id)
|
||
if entry is None:
|
||
raise KeyError(entry_id)
|
||
|
||
current_vitality = self._compute_vitality(entry)
|
||
created = self._parse_dt(entry.created_at)
|
||
age_days = (datetime.now(timezone.utc) - created).days
|
||
|
||
return {
|
||
"entry_id": entry.id,
|
||
"title": entry.title,
|
||
"vitality": round(current_vitality, 4),
|
||
"last_accessed": entry.last_accessed,
|
||
"age_days": age_days,
|
||
}
|
||
|
||
def fading(self, limit: int = 10) -> list[dict]:
|
||
"""Return entries with the lowest vitality (most neglected).
|
||
|
||
Args:
|
||
limit: Maximum number of entries to return.
|
||
|
||
Returns:
|
||
List of dicts sorted by vitality ascending (most faded first).
|
||
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
|
||
"""
|
||
scored = []
|
||
for entry in self._entries.values():
|
||
v = self._compute_vitality(entry)
|
||
created = self._parse_dt(entry.created_at)
|
||
age_days = (datetime.now(timezone.utc) - created).days
|
||
scored.append({
|
||
"entry_id": entry.id,
|
||
"title": entry.title,
|
||
"vitality": round(v, 4),
|
||
"last_accessed": entry.last_accessed,
|
||
"age_days": age_days,
|
||
})
|
||
scored.sort(key=lambda x: x["vitality"])
|
||
return scored[:limit]
|
||
|
||
def vibrant(self, limit: int = 10) -> list[dict]:
|
||
"""Return entries with the highest vitality (most alive).
|
||
|
||
Args:
|
||
limit: Maximum number of entries to return.
|
||
|
||
Returns:
|
||
List of dicts sorted by vitality descending (most vibrant first).
|
||
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
|
||
"""
|
||
scored = []
|
||
for entry in self._entries.values():
|
||
v = self._compute_vitality(entry)
|
||
created = self._parse_dt(entry.created_at)
|
||
age_days = (datetime.now(timezone.utc) - created).days
|
||
scored.append({
|
||
"entry_id": entry.id,
|
||
"title": entry.title,
|
||
"vitality": round(v, 4),
|
||
"last_accessed": entry.last_accessed,
|
||
"age_days": age_days,
|
||
})
|
||
scored.sort(key=lambda x: x["vitality"], reverse=True)
|
||
return scored[:limit]
|
||
|
||
def apply_decay(self) -> dict:
|
||
"""Apply time-based decay to all entries and persist.
|
||
|
||
Recomputes each entry's vitality based on elapsed time since
|
||
its last access (or creation if never accessed). Saves the
|
||
archive after updating.
|
||
|
||
Returns:
|
||
Dict with keys: total_entries, decayed_count, avg_vitality,
|
||
fading_count (entries below 0.3), vibrant_count (entries above 0.7)
|
||
"""
|
||
decayed = 0
|
||
total_vitality = 0.0
|
||
fading_count = 0
|
||
vibrant_count = 0
|
||
|
||
for entry in self._entries.values():
|
||
old_v = entry.vitality
|
||
new_v = self._compute_vitality(entry)
|
||
if abs(new_v - old_v) > 1e-6:
|
||
entry.vitality = new_v
|
||
decayed += 1
|
||
total_vitality += entry.vitality
|
||
if entry.vitality < 0.3:
|
||
fading_count += 1
|
||
if entry.vitality > 0.7:
|
||
vibrant_count += 1
|
||
|
||
n = len(self._entries)
|
||
self._save()
|
||
|
||
return {
|
||
"total_entries": n,
|
||
"decayed_count": decayed,
|
||
"avg_vitality": round(total_vitality / n, 4) if n else 0.0,
|
||
"fading_count": fading_count,
|
||
"vibrant_count": vibrant_count,
|
||
}
|
||
|
||
def consolidate(
|
||
self,
|
||
threshold: float = 0.9,
|
||
dry_run: bool = False,
|
||
) -> list[dict]:
|
||
"""Scan the archive and merge duplicate/near-duplicate entries.
|
||
|
||
Two entries are considered duplicates if:
|
||
- They share the same ``content_hash`` (exact duplicate), or
|
||
- Their similarity score (via HolographicLinker) exceeds ``threshold``
|
||
(near-duplicate when an embedding backend is available or Jaccard is
|
||
high enough at the given threshold).
|
||
|
||
Merge strategy:
|
||
- Keep the *older* entry (earlier ``created_at``).
|
||
- Union topics from both entries (case-deduped).
|
||
- Merge metadata from newer into older (older values win on conflicts).
|
||
- Transfer all links from the newer entry to the older entry.
|
||
- Delete the newer entry.
|
||
|
||
Args:
|
||
threshold: Similarity threshold for near-duplicate detection (0.0–1.0).
|
||
Default 0.9 is intentionally conservative.
|
||
dry_run: If True, return the list of would-be merges without mutating
|
||
the archive.
|
||
|
||
Returns:
|
||
List of dicts, one per merged pair::
|
||
|
||
{
|
||
"kept": <entry_id of survivor>,
|
||
"removed": <entry_id of duplicate>,
|
||
"reason": "exact_hash" | "semantic_similarity",
|
||
"score": float, # 1.0 for exact hash matches
|
||
"dry_run": bool,
|
||
}
|
||
"""
|
||
merges: list[dict] = []
|
||
entries = list(self._entries.values())
|
||
removed_ids: set[str] = set()
|
||
|
||
for i, entry_a in enumerate(entries):
|
||
if entry_a.id in removed_ids:
|
||
continue
|
||
for entry_b in entries[i + 1:]:
|
||
if entry_b.id in removed_ids:
|
||
continue
|
||
|
||
# Determine if they are duplicates
|
||
reason: Optional[str] = None
|
||
score: float = 0.0
|
||
|
||
if (
|
||
entry_a.content_hash is not None
|
||
and entry_b.content_hash is not None
|
||
and entry_a.content_hash == entry_b.content_hash
|
||
):
|
||
reason = "exact_hash"
|
||
score = 1.0
|
||
else:
|
||
sim = self.linker.compute_similarity(entry_a, entry_b)
|
||
if sim >= threshold:
|
||
reason = "semantic_similarity"
|
||
score = sim
|
||
|
||
if reason is None:
|
||
continue
|
||
|
||
# Decide which entry to keep (older survives)
|
||
if entry_a.created_at <= entry_b.created_at:
|
||
kept, removed = entry_a, entry_b
|
||
else:
|
||
kept, removed = entry_b, entry_a
|
||
|
||
merges.append({
|
||
"kept": kept.id,
|
||
"removed": removed.id,
|
||
"reason": reason,
|
||
"score": round(score, 4),
|
||
"dry_run": dry_run,
|
||
})
|
||
|
||
if not dry_run:
|
||
# Merge topics (case-deduped)
|
||
existing_lower = {t.lower() for t in kept.topics}
|
||
for tag in removed.topics:
|
||
if tag.lower() not in existing_lower:
|
||
kept.topics.append(tag)
|
||
existing_lower.add(tag.lower())
|
||
|
||
# Merge metadata (kept wins on key conflicts)
|
||
for k, v in removed.metadata.items():
|
||
if k not in kept.metadata:
|
||
kept.metadata[k] = v
|
||
|
||
# Transfer links: add removed's links to kept
|
||
kept_links_set = set(kept.links)
|
||
for lid in removed.links:
|
||
if lid != kept.id and lid not in kept_links_set and lid not in removed_ids:
|
||
kept.links.append(lid)
|
||
kept_links_set.add(lid)
|
||
# Update the other entry's back-link
|
||
other = self._entries.get(lid)
|
||
if other and kept.id not in other.links:
|
||
other.links.append(kept.id)
|
||
|
||
# Remove back-links pointing at the removed entry
|
||
for other in self._entries.values():
|
||
if removed.id in other.links:
|
||
other.links.remove(removed.id)
|
||
if other.id != kept.id and kept.id not in other.links:
|
||
other.links.append(kept.id)
|
||
|
||
del self._entries[removed.id]
|
||
removed_ids.add(removed.id)
|
||
|
||
if not dry_run and merges:
|
||
self._save()
|
||
|
||
return merges
|
||
|
||
|
||
def shortest_path(self, start_id: str, end_id: str) -> list[str] | None:
|
||
"""Find shortest path between two entries through the connection graph.
|
||
|
||
Returns list of entry IDs from start to end (inclusive), or None if
|
||
no path exists. Uses BFS for unweighted shortest path.
|
||
"""
|
||
if start_id == end_id:
|
||
return [start_id] if start_id in self._entries else None
|
||
if start_id not in self._entries or end_id not in self._entries:
|
||
return None
|
||
|
||
adj = self._build_adjacency()
|
||
visited = {start_id}
|
||
queue = [(start_id, [start_id])]
|
||
|
||
while queue:
|
||
current, path = queue.pop(0)
|
||
for neighbor in adj.get(current, []):
|
||
if neighbor == end_id:
|
||
return path + [neighbor]
|
||
if neighbor not in visited:
|
||
visited.add(neighbor)
|
||
queue.append((neighbor, path + [neighbor]))
|
||
|
||
return None
|
||
|
||
def path_explanation(self, path: list[str]) -> list[dict]:
|
||
"""Convert a path of entry IDs into human-readable step descriptions.
|
||
|
||
Returns list of dicts with 'id', 'title', and 'topics' for each step.
|
||
"""
|
||
steps = []
|
||
for entry_id in path:
|
||
entry = self._entries.get(entry_id)
|
||
if entry:
|
||
steps.append({
|
||
"id": entry.id,
|
||
"title": entry.title,
|
||
"topics": entry.topics,
|
||
"content_preview": entry.content[:120] + "..." if len(entry.content) > 120 else entry.content,
|
||
})
|
||
else:
|
||
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
|
||
return steps
|
||
|
||
# ─── Snapshot / Backup ────────────────────────────────────
|
||
|
||
def _snapshot_dir(self) -> Path:
|
||
"""Return (and create) the snapshots directory next to the archive."""
|
||
d = self.path.parent / "snapshots"
|
||
d.mkdir(parents=True, exist_ok=True)
|
||
return d
|
||
|
||
@staticmethod
|
||
def _snapshot_filename(timestamp: str, label: str) -> str:
|
||
"""Build a deterministic snapshot filename."""
|
||
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
|
||
return f"{timestamp}_{safe_label}.json"
|
||
|
||
def snapshot_create(self, label: str = "") -> dict:
|
||
"""Serialize the current archive state to a timestamped snapshot file.
|
||
|
||
Args:
|
||
label: Human-readable label for the snapshot (optional).
|
||
|
||
Returns:
|
||
Dict with keys: snapshot_id, label, created_at, entry_count, path
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
timestamp = now.strftime("%Y%m%d_%H%M%S")
|
||
filename = self._snapshot_filename(timestamp, label)
|
||
snapshot_id = filename[:-5] # strip .json
|
||
snap_path = self._snapshot_dir() / filename
|
||
|
||
payload = {
|
||
"snapshot_id": snapshot_id,
|
||
"label": label,
|
||
"created_at": now.isoformat(),
|
||
"entry_count": len(self._entries),
|
||
"archive_path": str(self.path),
|
||
"entries": [e.to_dict() for e in self._entries.values()],
|
||
}
|
||
with open(snap_path, "w") as f:
|
||
json.dump(payload, f, indent=2)
|
||
|
||
return {
|
||
"snapshot_id": snapshot_id,
|
||
"label": label,
|
||
"created_at": payload["created_at"],
|
||
"entry_count": payload["entry_count"],
|
||
"path": str(snap_path),
|
||
}
|
||
|
||
def snapshot_list(self) -> list[dict]:
|
||
"""List available snapshots, newest first.
|
||
|
||
Returns:
|
||
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
|
||
"""
|
||
snap_dir = self._snapshot_dir()
|
||
snapshots = []
|
||
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
|
||
try:
|
||
with open(snap_path) as f:
|
||
data = json.load(f)
|
||
snapshots.append({
|
||
"snapshot_id": data.get("snapshot_id", snap_path.stem),
|
||
"label": data.get("label", ""),
|
||
"created_at": data.get("created_at", ""),
|
||
"entry_count": data.get("entry_count", len(data.get("entries", []))),
|
||
"path": str(snap_path),
|
||
})
|
||
except (json.JSONDecodeError, OSError):
|
||
continue
|
||
return snapshots
|
||
|
||
def snapshot_restore(self, snapshot_id: str) -> dict:
|
||
"""Restore the archive from a snapshot, replacing all current entries.
|
||
|
||
Args:
|
||
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
|
||
|
||
Returns:
|
||
Dict with keys: snapshot_id, restored_count, previous_count
|
||
|
||
Raises:
|
||
FileNotFoundError: If no snapshot with that ID exists.
|
||
"""
|
||
snap_dir = self._snapshot_dir()
|
||
snap_path = snap_dir / f"{snapshot_id}.json"
|
||
if not snap_path.exists():
|
||
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
|
||
|
||
with open(snap_path) as f:
|
||
data = json.load(f)
|
||
|
||
previous_count = len(self._entries)
|
||
self._entries = {}
|
||
for entry_data in data.get("entries", []):
|
||
entry = ArchiveEntry.from_dict(entry_data)
|
||
self._entries[entry.id] = entry
|
||
|
||
self._save()
|
||
return {
|
||
"snapshot_id": snapshot_id,
|
||
"restored_count": len(self._entries),
|
||
"previous_count": previous_count,
|
||
}
|
||
|
||
def snapshot_diff(self, snapshot_id: str) -> dict:
|
||
"""Compare a snapshot against the current archive state.
|
||
|
||
Args:
|
||
snapshot_id: The snapshot_id to compare against current state.
|
||
|
||
Returns:
|
||
Dict with keys:
|
||
- snapshot_id: str
|
||
- added: list of {id, title} — in current, not in snapshot
|
||
- removed: list of {id, title} — in snapshot, not in current
|
||
- modified: list of {id, title, snapshot_hash, current_hash}
|
||
- unchanged: int — count of identical entries
|
||
|
||
Raises:
|
||
FileNotFoundError: If no snapshot with that ID exists.
|
||
"""
|
||
snap_dir = self._snapshot_dir()
|
||
snap_path = snap_dir / f"{snapshot_id}.json"
|
||
if not snap_path.exists():
|
||
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
|
||
|
||
with open(snap_path) as f:
|
||
data = json.load(f)
|
||
|
||
snap_entries: dict[str, dict] = {}
|
||
for entry_data in data.get("entries", []):
|
||
snap_entries[entry_data["id"]] = entry_data
|
||
|
||
current_ids = set(self._entries.keys())
|
||
snap_ids = set(snap_entries.keys())
|
||
|
||
added = []
|
||
for eid in current_ids - snap_ids:
|
||
e = self._entries[eid]
|
||
added.append({"id": e.id, "title": e.title})
|
||
|
||
removed = []
|
||
for eid in snap_ids - current_ids:
|
||
snap_e = snap_entries[eid]
|
||
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
|
||
|
||
modified = []
|
||
unchanged = 0
|
||
for eid in current_ids & snap_ids:
|
||
current_hash = self._entries[eid].content_hash
|
||
snap_hash = snap_entries[eid].get("content_hash")
|
||
if current_hash != snap_hash:
|
||
modified.append({
|
||
"id": eid,
|
||
"title": self._entries[eid].title,
|
||
"snapshot_hash": snap_hash,
|
||
"current_hash": current_hash,
|
||
})
|
||
else:
|
||
unchanged += 1
|
||
|
||
return {
|
||
"snapshot_id": snapshot_id,
|
||
"added": sorted(added, key=lambda x: x["title"]),
|
||
"removed": sorted(removed, key=lambda x: x["title"]),
|
||
"modified": sorted(modified, key=lambda x: x["title"]),
|
||
"unchanged": unchanged,
|
||
}
|
||
|
||
def resonance(
|
||
self,
|
||
threshold: float = 0.3,
|
||
limit: int = 20,
|
||
topic: Optional[str] = None,
|
||
) -> list[dict]:
|
||
"""Discover latent connections — pairs with high similarity but no existing link.
|
||
|
||
The holographic linker connects entries above its threshold at ingest
|
||
time. ``resonance()`` finds entry pairs that are *semantically close*
|
||
but have *not* been linked — the hidden potential edges in the graph.
|
||
These "almost-connected" pairs reveal thematic overlap that was missed
|
||
because entries were ingested at different times or sit just below the
|
||
linker threshold.
|
||
|
||
Args:
|
||
threshold: Minimum similarity score to surface a pair (default 0.3).
|
||
Pairs already linked are excluded regardless of score.
|
||
limit: Maximum number of pairs to return (default 20).
|
||
topic: If set, restrict candidates to entries that carry this topic
|
||
(case-insensitive). Both entries in a pair must match.
|
||
|
||
Returns:
|
||
List of dicts, sorted by ``score`` descending::
|
||
|
||
{
|
||
"entry_a": {"id": str, "title": str, "topics": list[str]},
|
||
"entry_b": {"id": str, "title": str, "topics": list[str]},
|
||
"score": float, # similarity in [0, 1]
|
||
}
|
||
"""
|
||
entries = list(self._entries.values())
|
||
|
||
if topic:
|
||
topic_lower = topic.lower()
|
||
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
|
||
|
||
results: list[dict] = []
|
||
|
||
for i, entry_a in enumerate(entries):
|
||
for entry_b in entries[i + 1:]:
|
||
# Skip pairs that are already linked
|
||
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
|
||
continue
|
||
|
||
score = self.linker.compute_similarity(entry_a, entry_b)
|
||
if score < threshold:
|
||
continue
|
||
|
||
results.append({
|
||
"entry_a": {
|
||
"id": entry_a.id,
|
||
"title": entry_a.title,
|
||
"topics": entry_a.topics,
|
||
},
|
||
"entry_b": {
|
||
"id": entry_b.id,
|
||
"title": entry_b.title,
|
||
"topics": entry_b.topics,
|
||
},
|
||
"score": round(score, 4),
|
||
})
|
||
|
||
results.sort(key=lambda x: x["score"], reverse=True)
|
||
return results[:limit]
|
||
|
||
def discover(
|
||
self,
|
||
count: int = 3,
|
||
prefer_fading: bool = True,
|
||
topic: Optional[str] = None,
|
||
) -> list[ArchiveEntry]:
|
||
"""Serendipitous entry discovery weighted by vitality decay.
|
||
|
||
Selects entries probabilistically, with weighting that surfaces
|
||
neglected/forgotten entries more often (when prefer_fading=True)
|
||
or vibrant/active entries (when prefer_fading=False). Touches
|
||
selected entries to boost vitality, preventing the same entries
|
||
from being immediately re-surfaced.
|
||
|
||
Args:
|
||
count: Number of entries to discover (default 3).
|
||
prefer_fading: If True (default), weight toward fading entries.
|
||
If False, weight toward vibrant entries.
|
||
topic: If set, restrict to entries with this topic (case-insensitive).
|
||
|
||
Returns:
|
||
List of ArchiveEntry, up to count entries.
|
||
"""
|
||
import random
|
||
|
||
candidates = list(self._entries.values())
|
||
|
||
if not candidates:
|
||
return []
|
||
|
||
if topic:
|
||
topic_lower = topic.lower()
|
||
candidates = [e for e in candidates if topic_lower in [t.lower() for t in e.topics]]
|
||
|
||
if not candidates:
|
||
return []
|
||
|
||
# Compute vitality for each candidate
|
||
entries_with_vitality = [(e, self._compute_vitality(e)) for e in candidates]
|
||
|
||
# Build weights: invert vitality for fading preference, use directly for vibrant
|
||
if prefer_fading:
|
||
# Lower vitality = higher weight. Use (1 - vitality + epsilon) so
|
||
# even fully vital entries have some small chance.
|
||
weights = [1.0 - v + 0.01 for _, v in entries_with_vitality]
|
||
else:
|
||
# Higher vitality = higher weight. Use (vitality + epsilon).
|
||
weights = [v + 0.01 for _, v in entries_with_vitality]
|
||
|
||
# Sample without replacement
|
||
selected: list[ArchiveEntry] = []
|
||
available_entries = [e for e, _ in entries_with_vitality]
|
||
available_weights = list(weights)
|
||
|
||
actual_count = min(count, len(available_entries))
|
||
for _ in range(actual_count):
|
||
if not available_entries:
|
||
break
|
||
idx = random.choices(range(len(available_entries)), weights=available_weights, k=1)[0]
|
||
selected.append(available_entries.pop(idx))
|
||
available_weights.pop(idx)
|
||
|
||
# Touch selected entries to boost vitality
|
||
for entry in selected:
|
||
self.touch(entry.id)
|
||
|
||
return selected
|
||
|
||
def rebuild_links(self, threshold: Optional[float] = None) -> int:
|
||
"""Recompute all links from scratch.
|
||
|
||
Clears existing links and re-applies the holographic linker to every
|
||
entry pair. Useful after bulk ingestion or threshold changes.
|
||
|
||
Args:
|
||
threshold: Override the linker's default similarity threshold.
|
||
|
||
Returns:
|
||
Total number of links created.
|
||
"""
|
||
if threshold is not None:
|
||
old_threshold = self.linker.threshold
|
||
self.linker.threshold = threshold
|
||
|
||
# Clear all links
|
||
for entry in self._entries.values():
|
||
entry.links = []
|
||
|
||
entries = list(self._entries.values())
|
||
total_links = 0
|
||
|
||
# Re-link each entry against all others
|
||
for entry in entries:
|
||
candidates = [e for e in entries if e.id != entry.id]
|
||
new_links = self.linker.apply_links(entry, candidates)
|
||
total_links += new_links
|
||
|
||
if threshold is not None:
|
||
self.linker.threshold = old_threshold
|
||
|
||
self._save()
|
||
return total_links
|