Compare commits

..

17 Commits

Author SHA1 Message Date
Alexander Whitestone
d6b7d9137b feat: wire MemoryPulse to crystal click handler
Import and initialize MemoryPulse in app.js. Trigger pulse wave
on every crystal click alongside MemoryInspect panel.

Closes #1263
2026-04-12 02:40:36 -04:00
Alexander Whitestone
8d7930de31 feat: add memory-pulse.js — BFS wave animation engine
Implements the memory_pulse feature from FEATURES.yaml.
Visual pulse wave radiates through connection graph when a
crystal is clicked, illuminating linked memories by BFS hop
distance with expanding ring effects.

Closes #1263
2026-04-12 02:40:03 -04:00
b52c7281f0 [claude] Mnemosyne: memory consolidation — auto-merge duplicates (#1260) (#1262)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 2s
2026-04-12 06:24:24 +00:00
af1221fb80 auto
Some checks failed
Deploy Nexus / deploy (push) Failing after 2s
Staging Verification Gate / verify-staging (push) Failing after 2s
auto
2026-04-12 06:08:51 +00:00
42a4169940 docs(mnemosyne): mark memory_decay as shipped
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 13s
Review Approval Gate / verify-review (pull_request) Failing after 3s
Part of #1258.
2026-04-12 05:43:30 +00:00
3f7c037562 test(mnemosyne): add memory decay test suite
Part of #1258.
- Test vitality fields on entry model
- Test touch() access recording and boost
- Test compute_vitality decay math
- Test fading/vibrant queries
- Test apply_decay bulk operation
- Test stats integration
- Integration lifecycle test
2026-04-12 05:43:28 +00:00
17e714c9d2 feat(mnemosyne): add memory decay system to MnemosyneArchive
Part of #1258.
- touch(entry_id): record access, boost vitality
- get_vitality(entry_id): current vitality status  
- fading(limit): most neglected entries
- vibrant(limit): most alive entries
- apply_decay(): decay all entries, persist
- stats() updated with avg_vitality, fading_count, vibrant_count

Decay: exponential with 30-day half-life.
Touch: 0.1 * (1 - current_vitality) — diminishing returns.
2026-04-12 05:42:37 +00:00
653c20862c feat(mnemosyne): add vitality and last_accessed fields to ArchiveEntry
Part of #1258 — memory decay system.
- vitality: float 0.0-1.0 (default 1.0)
- last_accessed: ISO datetime of last access

Also ensures updated_at and content_hash fields from main are present.
2026-04-12 05:41:42 +00:00
89e19dbaa2 Merge PR #1257
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 4s
Auto-merged by Timmy PR review cron job
2026-04-12 05:30:03 +00:00
3fca28b1c8 feat: export embedding backends from mnemosyne __init__
Some checks failed
CI / test (pull_request) Failing after 10s
CI / validate (pull_request) Failing after 15s
Review Approval Gate / verify-review (pull_request) Failing after 2s
2026-04-12 05:07:55 +00:00
1f8994abc9 docs: mark embedding_backend as shipped in FEATURES.yaml 2026-04-12 05:07:29 +00:00
fcdb049117 feat: CLI --backend flag for embedding backend selection
- search: --backend ollama|tfidf|auto
- rebuild: --backend flag
- Auto-detects best backend when --semantic is used
2026-04-12 05:07:14 +00:00
85dda06ff0 test: add embedding backend test suite
Tests cosine similarity, TF-IDF backend,
auto-detection, and fallback behavior.
2026-04-12 05:06:24 +00:00
bd27cd4bf5 feat: archive.py uses embedding backend for semantic search
- MnemosyneArchive.__init__ accepts optional EmbeddingBackend
- Auto-detects best backend via get_embedding_backend()
- semantic_search uses embedding cosine similarity when available
- Falls back to Jaccard token similarity gracefully
2026-04-12 05:06:00 +00:00
fd7c66bd54 feat: linker supports pluggable embedding backend
HolographicLinker now accepts optional EmbeddingBackend.
Uses cosine similarity on embeddings when available,
falls back to Jaccard token similarity otherwise.
Embedding cache for performance during link operations.
2026-04-12 05:05:17 +00:00
3bf8d6e0a6 feat: add pluggable embedding backend (Ollama + TF-IDF)
Implements embedding_backend from FEATURES.yaml:
- Abstract EmbeddingBackend interface
- OllamaEmbeddingBackend for local sovereign models
- TfidfEmbeddingBackend pure-Python fallback
- get_embedding_backend() auto-detection
2026-04-12 05:04:53 +00:00
eeba35b3a9 Merge pull request '[EPIC] IaC Workflow — .gitignore fix, stale PR closer, FEATURES.yaml, CONTRIBUTING.md' (#1254) from epic/iac-workflow-1248 into main
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 2s
2026-04-12 04:51:44 +00:00
12 changed files with 1468 additions and 40 deletions

3
app.js
View File

@@ -7,6 +7,7 @@ import { SpatialMemory } from './nexus/components/spatial-memory.js';
import { MemoryBirth } from './nexus/components/memory-birth.js';
import { MemoryOptimizer } from './nexus/components/memory-optimizer.js';
import { MemoryInspect } from './nexus/components/memory-inspect.js';
import { MemoryPulse } from './nexus/components/memory-pulse.js';
// ═══════════════════════════════════════════
// NEXUS v1.1 — Portal System Update
@@ -715,6 +716,7 @@ async function init() {
MemoryBirth.wrapSpatialMemory(SpatialMemory);
SpatialMemory.setCamera(camera);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(scene);
updateLoad(90);
loadSession();
@@ -1947,6 +1949,7 @@ function setupControls() {
SpatialMemory.highlightMemory(entry.data.id);
const regionDef = SpatialMemory.REGIONS[entry.region] || SpatialMemory.REGIONS.working;
MemoryInspect.show(entry.data, regionDef);
MemoryPulse.trigger(entry.data.id, SpatialMemory);
}
} else {
// Clicked empty space — close inspect panel and deselect crystal

View File

@@ -0,0 +1,256 @@
// ═══════════════════════════════════════════════════════════
// MNEMOSYNE — Memory Pulse
// ═══════════════════════════════════════════════════════════
//
// Visual pulse wave that radiates through the connection graph
// when a memory crystal is clicked. Illuminates linked memories
// by BFS hop distance — closer neighbors light up first.
//
// Usage from app.js:
// import { MemoryPulse } from './nexus/components/memory-pulse.js';
// MemoryPulse.init(scene);
// MemoryPulse.trigger(clickedMemId, SpatialMemory);
//
// Depends on: SpatialMemory (getAllMemories, getMemoryFromMesh)
// ═══════════════════════════════════════════════════════════
const MemoryPulse = (() => {
let _scene = null;
let _activePulses = []; // track running animations for cleanup
const HOP_DELAY = 300; // ms between each BFS hop wave
const GLOW_DURATION = 800; // ms each crystal glows at peak
const FADE_DURATION = 600; // ms to fade back to normal
const PULSE_COLOR = 0x4af0c0; // cyan-green pulse glow
const PULSE_INTENSITY = 6.0; // peak emissive during pulse
const RING_DURATION = 1200; // ms for the expanding ring effect
// ─── INIT ────────────────────────────────────────────────
function init(scene) {
_scene = scene;
}
// ─── BFS TRAVERSAL ───────────────────────────────────────
// Returns array of arrays: [[hop-0 ids], [hop-1 ids], [hop-2 ids], ...]
function bfsHops(startId, allMemories) {
const memMap = {};
for (const m of allMemories) {
memMap[m.id] = m;
}
if (!memMap[startId]) return [];
const visited = new Set([startId]);
const hops = [];
let frontier = [startId];
while (frontier.length > 0) {
hops.push([...frontier]);
const next = [];
for (const id of frontier) {
const mem = memMap[id];
if (!mem || !mem.connections) continue;
for (const connId of mem.connections) {
if (!visited.has(connId)) {
visited.add(connId);
next.push(connId);
}
}
}
frontier = next;
}
return hops;
}
// ─── EXPANDING RING ──────────────────────────────────────
// Creates a flat ring geometry that expands outward from a position
function createExpandingRing(position, color) {
const ringGeo = new THREE.RingGeometry(0.1, 0.2, 32);
const ringMat = new THREE.MeshBasicMaterial({
color: color,
transparent: true,
opacity: 0.8,
side: THREE.DoubleSide,
depthWrite: false
});
const ring = new THREE.Mesh(ringGeo, ringMat);
ring.position.copy(position);
ring.position.y += 0.1; // slightly above crystal
ring.rotation.x = -Math.PI / 2; // flat horizontal
ring.scale.set(0.1, 0.1, 0.1);
_scene.add(ring);
return ring;
}
// ─── ANIMATE RING ────────────────────────────────────────
function animateRing(ring, onComplete) {
const startTime = performance.now();
function tick() {
const elapsed = performance.now() - startTime;
const t = Math.min(1, elapsed / RING_DURATION);
// Expand outward
const scale = 0.1 + t * 4.0;
ring.scale.set(scale, scale, scale);
// Fade out
ring.material.opacity = 0.8 * (1 - t * t);
if (t < 1) {
requestAnimationFrame(tick);
} else {
_scene.remove(ring);
ring.geometry.dispose();
ring.material.dispose();
if (onComplete) onComplete();
}
}
requestAnimationFrame(tick);
}
// ─── PULSE CRYSTAL GLOW ──────────────────────────────────
// Temporarily boosts a crystal's emissive intensity
function pulseGlow(mesh, hopIndex) {
if (!mesh || !mesh.material) return;
const originalIntensity = mesh.material.emissiveIntensity;
const originalColor = mesh.material.emissive ? mesh.material.emissive.clone() : null;
const delay = hopIndex * HOP_DELAY;
setTimeout(() => {
if (!mesh.material) return;
// Store original for restore
const origInt = mesh.material.emissiveIntensity;
// Flash to pulse color
if (mesh.material.emissive) {
mesh.material.emissive.setHex(PULSE_COLOR);
}
mesh.material.emissiveIntensity = PULSE_INTENSITY;
// Also boost point light if present
let origLightIntensity = null;
let origLightColor = null;
if (mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
origLightIntensity = child.intensity;
origLightColor = child.color.clone();
child.intensity = 3.0;
child.color.setHex(PULSE_COLOR);
}
}
}
// Hold at peak, then fade
setTimeout(() => {
const fadeStart = performance.now();
function fadeTick() {
const elapsed = performance.now() - fadeStart;
const t = Math.min(1, elapsed / FADE_DURATION);
const eased = 1 - (1 - t) * (1 - t); // ease-out quad
mesh.material.emissiveIntensity = PULSE_INTENSITY + (origInt - PULSE_INTENSITY) * eased;
if (originalColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
mesh.material.emissive.setRGB(
pr + (originalColor.r - pr) * eased,
pg + (originalColor.g - pg) * eased,
pb + (originalColor.b - pb) * eased
);
}
// Restore point light
if (origLightIntensity !== null && mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
child.intensity = 3.0 + (origLightIntensity - 3.0) * eased;
if (origLightColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
child.color.setRGB(
pr + (origLightColor.r - pr) * eased,
pg + (origLightColor.g - pg) * eased,
pb + (origLightColor.b - pb) * eased
);
}
}
}
}
if (t < 1) {
requestAnimationFrame(fadeTick);
}
}
requestAnimationFrame(fadeTick);
}, GLOW_DURATION);
}, delay);
}
// ─── TRIGGER ─────────────────────────────────────────────
// Main entry point: fire a pulse wave from the given memory ID
function trigger(memId, spatialMemory) {
if (!_scene) return;
const allMemories = spatialMemory.getAllMemories();
const hops = bfsHops(memId, allMemories);
if (hops.length <= 1) {
// No connections — just do a local ring
const obj = spatialMemory.getMemoryFromMesh(
spatialMemory.getCrystalMeshes().find(m => m.userData.memId === memId)
);
if (obj && obj.mesh) {
const ring = createExpandingRing(obj.mesh.position, PULSE_COLOR);
animateRing(ring);
}
return;
}
// For each hop level, create expanding rings and pulse glows
for (let hopIdx = 0; hopIdx < hops.length; hopIdx++) {
const idsInHop = hops[hopIdx];
for (const id of idsInHop) {
// Find mesh for this memory
const meshes = spatialMemory.getCrystalMeshes();
let targetMesh = null;
for (const m of meshes) {
if (m.userData && m.userData.memId === id) {
targetMesh = m;
break;
}
}
if (!targetMesh) continue;
// Schedule pulse glow
pulseGlow(targetMesh, hopIdx);
// Create expanding ring at this hop's delay
((mesh, delay) => {
setTimeout(() => {
const ring = createExpandingRing(mesh.position, PULSE_COLOR);
animateRing(ring);
}, delay * HOP_DELAY);
})(targetMesh, hopIdx);
}
}
}
// ─── CLEANUP ─────────────────────────────────────────────
function dispose() {
// Active pulses will self-clean via their animation callbacks
_activePulses = [];
}
return { init, trigger, dispose, bfsHops };
})();
export { MemoryPulse };

View File

@@ -67,7 +67,7 @@ modules:
cli:
status: shipped
files: [cli.py]
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate
tests:
status: shipped
@@ -151,13 +151,16 @@ frontend:
planned:
memory_decay:
status: planned
status: shipped
files: [entry.py, archive.py]
description: >
Memories have living energy that fades with neglect and
brightens with access. Vitality score based on access
frequency and recency. Was attempted in PR #1221 but
went stale — needs fresh implementation against current main.
frequency and recency. Exponential decay with 30-day half-life.
Touch boost with diminishing returns.
priority: medium
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_pulse:
status: planned
@@ -168,17 +171,23 @@ planned:
priority: medium
embedding_backend:
status: planned
status: shipped
files: [embeddings.py]
description: >
Pluggable embedding backend for true semantic search
(replacing Jaccard token similarity). Support local models
via Ollama for sovereignty.
Pluggable embedding backend for true semantic search.
Supports Ollama (local models) and TF-IDF fallback.
Auto-detects best available backend.
priority: high
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_consolidation:
status: planned
status: shipped
files: [archive.py, cli.py, tests/test_consolidation.py]
description: >
Automatic merging of duplicate/near-duplicate memories
using content_hash and semantic similarity. Periodic
consolidation pass.
priority: low
merged_prs:
- "#1260"

View File

@@ -14,6 +14,12 @@ from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
TfidfEmbeddingBackend,
get_embedding_backend,
)
__all__ = [
"MnemosyneArchive",
@@ -21,4 +27,8 @@ __all__ = [
"HolographicLinker",
"ingest_from_mempalace",
"ingest_event",
"EmbeddingBackend",
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",
"get_embedding_backend",
]

View File

@@ -13,6 +13,7 @@ from typing import Optional
from nexus.mnemosyne.entry import ArchiveEntry, _compute_content_hash
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.embeddings import get_embedding_backend, EmbeddingBackend
_EXPORT_VERSION = "1"
@@ -24,10 +25,21 @@ class MnemosyneArchive:
MemPalace (ChromaDB) for vector-semantic search.
"""
def __init__(self, archive_path: Optional[Path] = None):
def __init__(
self,
archive_path: Optional[Path] = None,
embedding_backend: Optional[EmbeddingBackend] = None,
auto_embed: bool = True,
):
self.path = archive_path or Path.home() / ".hermes" / "mnemosyne" / "archive.json"
self.path.parent.mkdir(parents=True, exist_ok=True)
self.linker = HolographicLinker()
self._embedding_backend = embedding_backend
if embedding_backend is None and auto_embed:
try:
self._embedding_backend = get_embedding_backend()
except Exception:
self._embedding_backend = None
self.linker = HolographicLinker(embedding_backend=self._embedding_backend)
self._entries: dict[str, ArchiveEntry] = {}
self._load()
@@ -143,33 +155,51 @@ class MnemosyneArchive:
return [e for _, e in scored[:limit]]
def semantic_search(self, query: str, limit: int = 10, threshold: float = 0.05) -> list[ArchiveEntry]:
"""Semantic search using holographic linker similarity.
"""Semantic search using embeddings or holographic linker similarity.
Scores each entry by Jaccard similarity between query tokens and entry
tokens, then boosts entries with more inbound links (more "holographic").
Falls back to keyword search if no entries meet the similarity threshold.
With an embedding backend: cosine similarity between query vector and
entry vectors, boosted by inbound link count.
Without: Jaccard similarity on tokens with link boost.
Falls back to keyword search if nothing meets the threshold.
Args:
query: Natural language query string.
limit: Maximum number of results to return.
threshold: Minimum Jaccard similarity to be considered a semantic match.
threshold: Minimum similarity score to include in results.
Returns:
List of ArchiveEntry sorted by combined relevance score, descending.
"""
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
# Count inbound links for each entry (how many entries link TO this one)
# Count inbound links for link-boost
inbound: dict[str, int] = {eid: 0 for eid in self._entries}
for entry in self._entries.values():
for linked_id in entry.links:
if linked_id in inbound:
inbound[linked_id] += 1
max_inbound = max(inbound.values(), default=1) or 1
# Try embedding-based search first
if self._embedding_backend:
query_vec = self._embedding_backend.embed(query)
if query_vec:
scored = []
for entry in self._entries.values():
text = f"{entry.title} {entry.content} {' '.join(entry.topics)}"
entry_vec = self._embedding_backend.embed(text)
if not entry_vec:
continue
sim = self._embedding_backend.similarity(query_vec, entry_vec)
if sim >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.15
scored.append((sim + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Fallback: Jaccard token similarity
query_tokens = HolographicLinker._tokenize(query)
if not query_tokens:
return []
scored = []
for entry in self._entries.values():
entry_tokens = HolographicLinker._tokenize(f"{entry.title} {entry.content} {' '.join(entry.topics)}")
@@ -179,14 +209,13 @@ class MnemosyneArchive:
union = query_tokens | entry_tokens
jaccard = len(intersection) / len(union)
if jaccard >= threshold:
link_boost = inbound[entry.id] / max_inbound * 0.2 # up to 20% boost
link_boost = inbound[entry.id] / max_inbound * 0.2
scored.append((jaccard + link_boost, entry))
if scored:
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:limit]]
# Graceful fallback to keyword search
# Final fallback: keyword search
return self.search(query, limit=limit)
def get_linked(self, entry_id: str, depth: int = 1) -> list[ArchiveEntry]:
@@ -360,6 +389,17 @@ class MnemosyneArchive:
oldest_entry = timestamps[0] if timestamps else None
newest_entry = timestamps[-1] if timestamps else None
# Vitality summary
if n > 0:
vitalities = [self._compute_vitality(e) for e in entries]
avg_vitality = round(sum(vitalities) / n, 4)
fading_count = sum(1 for v in vitalities if v < 0.3)
vibrant_count = sum(1 for v in vitalities if v > 0.7)
else:
avg_vitality = 0.0
fading_count = 0
vibrant_count = 0
return {
"entries": n,
"total_links": total_links,
@@ -369,6 +409,9 @@ class MnemosyneArchive:
"link_density": link_density,
"oldest_entry": oldest_entry,
"newest_entry": newest_entry,
"avg_vitality": avg_vitality,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def _build_adjacency(self) -> dict[str, set[str]]:
@@ -713,6 +756,309 @@ class MnemosyneArchive:
results.sort(key=lambda e: e.created_at)
return results
# ─── Memory Decay ─────────────────────────────────────────
# Decay parameters
_DECAY_HALF_LIFE_DAYS: float = 30.0 # Half-life for exponential decay
_TOUCH_BOOST_FACTOR: float = 0.1 # Base boost on access (diminishes as vitality → 1.0)
def touch(self, entry_id: str) -> ArchiveEntry:
"""Record an access to an entry, boosting its vitality.
The boost is ``_TOUCH_BOOST_FACTOR * (1 - current_vitality)`` —
diminishing returns as vitality approaches 1.0 ensures entries
can never exceed 1.0 through touch alone.
Args:
entry_id: ID of the entry to touch.
Returns:
The updated ArchiveEntry.
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
now = datetime.now(timezone.utc).isoformat()
# Compute current decayed vitality before boosting
current = self._compute_vitality(entry)
boost = self._TOUCH_BOOST_FACTOR * (1.0 - current)
entry.vitality = min(1.0, current + boost)
entry.last_accessed = now
self._save()
return entry
def _compute_vitality(self, entry: ArchiveEntry) -> float:
"""Compute the current vitality of an entry based on time decay.
Uses exponential decay: ``v = base * 0.5 ^ (hours_since_access / half_life_hours)``
If the entry has never been accessed, uses ``created_at`` as the
reference point. New entries with no access start at full vitality.
Args:
entry: The archive entry.
Returns:
Current vitality as a float in [0.0, 1.0].
"""
if entry.last_accessed is None:
# Never accessed — check age from creation
created = self._parse_dt(entry.created_at)
hours_elapsed = (datetime.now(timezone.utc) - created).total_seconds() / 3600
else:
last = self._parse_dt(entry.last_accessed)
hours_elapsed = (datetime.now(timezone.utc) - last).total_seconds() / 3600
half_life_hours = self._DECAY_HALF_LIFE_DAYS * 24
if hours_elapsed <= 0 or half_life_hours <= 0:
return entry.vitality
decayed = entry.vitality * (0.5 ** (hours_elapsed / half_life_hours))
return max(0.0, min(1.0, decayed))
def get_vitality(self, entry_id: str) -> dict:
"""Get the current vitality status of an entry.
Args:
entry_id: ID of the entry.
Returns:
Dict with keys: entry_id, title, vitality, last_accessed, age_days
Raises:
KeyError: If entry_id does not exist.
"""
entry = self._entries.get(entry_id)
if entry is None:
raise KeyError(entry_id)
current_vitality = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
return {
"entry_id": entry.id,
"title": entry.title,
"vitality": round(current_vitality, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
}
def fading(self, limit: int = 10) -> list[dict]:
"""Return entries with the lowest vitality (most neglected).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality ascending (most faded first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"])
return scored[:limit]
def vibrant(self, limit: int = 10) -> list[dict]:
"""Return entries with the highest vitality (most alive).
Args:
limit: Maximum number of entries to return.
Returns:
List of dicts sorted by vitality descending (most vibrant first).
Each dict has keys: entry_id, title, vitality, last_accessed, age_days
"""
scored = []
for entry in self._entries.values():
v = self._compute_vitality(entry)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
scored.append({
"entry_id": entry.id,
"title": entry.title,
"vitality": round(v, 4),
"last_accessed": entry.last_accessed,
"age_days": age_days,
})
scored.sort(key=lambda x: x["vitality"], reverse=True)
return scored[:limit]
def apply_decay(self) -> dict:
"""Apply time-based decay to all entries and persist.
Recomputes each entry's vitality based on elapsed time since
its last access (or creation if never accessed). Saves the
archive after updating.
Returns:
Dict with keys: total_entries, decayed_count, avg_vitality,
fading_count (entries below 0.3), vibrant_count (entries above 0.7)
"""
decayed = 0
total_vitality = 0.0
fading_count = 0
vibrant_count = 0
for entry in self._entries.values():
old_v = entry.vitality
new_v = self._compute_vitality(entry)
if abs(new_v - old_v) > 1e-6:
entry.vitality = new_v
decayed += 1
total_vitality += entry.vitality
if entry.vitality < 0.3:
fading_count += 1
if entry.vitality > 0.7:
vibrant_count += 1
n = len(self._entries)
self._save()
return {
"total_entries": n,
"decayed_count": decayed,
"avg_vitality": round(total_vitality / n, 4) if n else 0.0,
"fading_count": fading_count,
"vibrant_count": vibrant_count,
}
def consolidate(
self,
threshold: float = 0.9,
dry_run: bool = False,
) -> list[dict]:
"""Scan the archive and merge duplicate/near-duplicate entries.
Two entries are considered duplicates if:
- They share the same ``content_hash`` (exact duplicate), or
- Their similarity score (via HolographicLinker) exceeds ``threshold``
(near-duplicate when an embedding backend is available or Jaccard is
high enough at the given threshold).
Merge strategy:
- Keep the *older* entry (earlier ``created_at``).
- Union topics from both entries (case-deduped).
- Merge metadata from newer into older (older values win on conflicts).
- Transfer all links from the newer entry to the older entry.
- Delete the newer entry.
Args:
threshold: Similarity threshold for near-duplicate detection (0.01.0).
Default 0.9 is intentionally conservative.
dry_run: If True, return the list of would-be merges without mutating
the archive.
Returns:
List of dicts, one per merged pair::
{
"kept": <entry_id of survivor>,
"removed": <entry_id of duplicate>,
"reason": "exact_hash" | "semantic_similarity",
"score": float, # 1.0 for exact hash matches
"dry_run": bool,
}
"""
merges: list[dict] = []
entries = list(self._entries.values())
removed_ids: set[str] = set()
for i, entry_a in enumerate(entries):
if entry_a.id in removed_ids:
continue
for entry_b in entries[i + 1:]:
if entry_b.id in removed_ids:
continue
# Determine if they are duplicates
reason: Optional[str] = None
score: float = 0.0
if (
entry_a.content_hash is not None
and entry_b.content_hash is not None
and entry_a.content_hash == entry_b.content_hash
):
reason = "exact_hash"
score = 1.0
else:
sim = self.linker.compute_similarity(entry_a, entry_b)
if sim >= threshold:
reason = "semantic_similarity"
score = sim
if reason is None:
continue
# Decide which entry to keep (older survives)
if entry_a.created_at <= entry_b.created_at:
kept, removed = entry_a, entry_b
else:
kept, removed = entry_b, entry_a
merges.append({
"kept": kept.id,
"removed": removed.id,
"reason": reason,
"score": round(score, 4),
"dry_run": dry_run,
})
if not dry_run:
# Merge topics (case-deduped)
existing_lower = {t.lower() for t in kept.topics}
for tag in removed.topics:
if tag.lower() not in existing_lower:
kept.topics.append(tag)
existing_lower.add(tag.lower())
# Merge metadata (kept wins on key conflicts)
for k, v in removed.metadata.items():
if k not in kept.metadata:
kept.metadata[k] = v
# Transfer links: add removed's links to kept
kept_links_set = set(kept.links)
for lid in removed.links:
if lid != kept.id and lid not in kept_links_set and lid not in removed_ids:
kept.links.append(lid)
kept_links_set.add(lid)
# Update the other entry's back-link
other = self._entries.get(lid)
if other and kept.id not in other.links:
other.links.append(kept.id)
# Remove back-links pointing at the removed entry
for other in self._entries.values():
if removed.id in other.links:
other.links.remove(removed.id)
if other.id != kept.id and kept.id not in other.links:
other.links.append(kept.id)
del self._entries[removed.id]
removed_ids.add(removed.id)
if not dry_run and merges:
self._save()
return merges
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.

View File

@@ -25,7 +25,16 @@ def cmd_stats(args):
def cmd_search(args):
archive = MnemosyneArchive()
from nexus.mnemosyne.embeddings import get_embedding_backend
backend = None
if getattr(args, "backend", "auto") != "auto":
backend = get_embedding_backend(prefer=args.backend)
elif getattr(args, "semantic", False):
try:
backend = get_embedding_backend()
except Exception:
pass
archive = MnemosyneArchive(embedding_backend=backend)
if getattr(args, "semantic", False):
results = archive.semantic_search(args.query, limit=args.limit)
else:
@@ -197,6 +206,23 @@ def cmd_timeline(args):
print()
def cmd_consolidate(args):
archive = MnemosyneArchive()
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
if not merges:
print("No duplicates found.")
return
label = "[DRY RUN] " if args.dry_run else ""
for m in merges:
print(f"{label}Merge ({m['reason']}, score={m['score']:.4f}):")
print(f" kept: {m['kept'][:8]}")
print(f" removed: {m['removed'][:8]}")
if args.dry_run:
print(f"\n{len(merges)} pair(s) would be merged. Re-run without --dry-run to apply.")
else:
print(f"\nMerged {len(merges)} duplicate pair(s).")
def cmd_neighbors(args):
archive = MnemosyneArchive()
try:
@@ -274,6 +300,10 @@ def main():
nb.add_argument("entry_id", help="Anchor entry ID")
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
args = parser.parse_args()
if not args.command:
parser.print_help()
@@ -296,6 +326,7 @@ def main():
"retag": cmd_retag,
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
"consolidate": cmd_consolidate,
}
dispatch[args.command](args)

View File

@@ -0,0 +1,170 @@
"""Pluggable embedding backends for Mnemosyne semantic search.
Provides an abstract EmbeddingBackend interface and concrete implementations:
- OllamaEmbeddingBackend: local models via Ollama (sovereign, no cloud)
- TfidfEmbeddingBackend: pure-Python TF-IDF fallback (no dependencies)
Usage:
from nexus.mnemosyne.embeddings import get_embedding_backend
backend = get_embedding_backend() # auto-detects best available
vec = backend.embed("hello world")
score = backend.similarity(vec_a, vec_b)
"""
from __future__ import annotations
import abc, json, math, os, re, urllib.request
from typing import Optional
class EmbeddingBackend(abc.ABC):
"""Abstract interface for embedding-based similarity."""
@abc.abstractmethod
def embed(self, text: str) -> list[float]:
"""Return an embedding vector for the given text."""
@abc.abstractmethod
def similarity(self, a: list[float], b: list[float]) -> float:
"""Return cosine similarity between two vectors, in [0, 1]."""
@property
def name(self) -> str:
return self.__class__.__name__
@property
def dimension(self) -> int:
return 0
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Cosine similarity between two vectors."""
if len(a) != len(b):
raise ValueError(f"Vector dimension mismatch: {len(a)} vs {len(b)}")
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x * x for x in a))
norm_b = math.sqrt(sum(x * x for x in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
class OllamaEmbeddingBackend(EmbeddingBackend):
"""Embedding backend using a local Ollama instance.
Default model: nomic-embed-text (768 dims)."""
def __init__(self, base_url: str | None = None, model: str | None = None):
self.base_url = base_url or os.environ.get("OLLAMA_URL", "http://localhost:11434")
self.model = model or os.environ.get("MNEMOSYNE_EMBED_MODEL", "nomic-embed-text")
self._dim: int = 0
self._available: bool | None = None
def _check_available(self) -> bool:
if self._available is not None:
return self._available
try:
req = urllib.request.Request(f"{self.base_url}/api/tags", method="GET")
resp = urllib.request.urlopen(req, timeout=3)
tags = json.loads(resp.read())
models = [m["name"].split(":")[0] for m in tags.get("models", [])]
self._available = any(self.model in m for m in models)
except Exception:
self._available = False
return self._available
@property
def name(self) -> str:
return f"Ollama({self.model})"
@property
def dimension(self) -> int:
return self._dim
def embed(self, text: str) -> list[float]:
if not self._check_available():
raise RuntimeError(f"Ollama not available or model {self.model} not found")
data = json.dumps({"model": self.model, "prompt": text}).encode()
req = urllib.request.Request(
f"{self.base_url}/api/embeddings", data=data,
headers={"Content-Type": "application/json"}, method="POST")
resp = urllib.request.urlopen(req, timeout=30)
result = json.loads(resp.read())
vec = result.get("embedding", [])
if vec:
self._dim = len(vec)
return vec
def similarity(self, a: list[float], b: list[float]) -> float:
raw = cosine_similarity(a, b)
return (raw + 1.0) / 2.0
class TfidfEmbeddingBackend(EmbeddingBackend):
"""Pure-Python TF-IDF embedding. No dependencies. Always available."""
def __init__(self):
self._vocab: dict[str, int] = {}
self._idf: dict[str, float] = {}
self._doc_count: int = 0
self._doc_freq: dict[str, int] = {}
@property
def name(self) -> str:
return "TF-IDF (local)"
@property
def dimension(self) -> int:
return len(self._vocab)
@staticmethod
def _tokenize(text: str) -> list[str]:
return [t for t in re.findall(r"\w+", text.lower()) if len(t) > 2]
def _update_idf(self, tokens: list[str]):
self._doc_count += 1
for t in set(tokens):
self._doc_freq[t] = self._doc_freq.get(t, 0) + 1
for t, df in self._doc_freq.items():
self._idf[t] = math.log((self._doc_count + 1) / (df + 1)) + 1.0
def embed(self, text: str) -> list[float]:
tokens = self._tokenize(text)
if not tokens:
return []
for t in tokens:
if t not in self._vocab:
self._vocab[t] = len(self._vocab)
self._update_idf(tokens)
dim = len(self._vocab)
vec = [0.0] * dim
tf = {}
for t in tokens:
tf[t] = tf.get(t, 0) + 1
for t, count in tf.items():
vec[self._vocab[t]] = (count / len(tokens)) * self._idf.get(t, 1.0)
norm = math.sqrt(sum(v * v for v in vec))
if norm > 0:
vec = [v / norm for v in vec]
return vec
def similarity(self, a: list[float], b: list[float]) -> float:
if len(a) != len(b):
mx = max(len(a), len(b))
a = a + [0.0] * (mx - len(a))
b = b + [0.0] * (mx - len(b))
return max(0.0, cosine_similarity(a, b))
def get_embedding_backend(prefer: str | None = None, ollama_url: str | None = None,
model: str | None = None) -> EmbeddingBackend:
"""Auto-detect best available embedding backend. Priority: Ollama > TF-IDF."""
env_pref = os.environ.get("MNEMOSYNE_EMBED_BACKEND")
effective = prefer or env_pref
if effective == "tfidf":
return TfidfEmbeddingBackend()
if effective in (None, "ollama"):
ollama = OllamaEmbeddingBackend(base_url=ollama_url, model=model)
if ollama._check_available():
return ollama
if effective == "ollama":
raise RuntimeError("Ollama backend requested but not available")
return TfidfEmbeddingBackend()

View File

@@ -34,6 +34,8 @@ class ArchiveEntry:
updated_at: Optional[str] = None # Set on mutation; None means same as created_at
links: list[str] = field(default_factory=list) # IDs of related entries
content_hash: Optional[str] = None # SHA-256 of title+content for dedup
vitality: float = 1.0 # 0.0 (dead) to 1.0 (fully alive)
last_accessed: Optional[str] = None # ISO datetime of last access; None = never accessed
def __post_init__(self):
if self.content_hash is None:
@@ -52,6 +54,8 @@ class ArchiveEntry:
"updated_at": self.updated_at,
"links": self.links,
"content_hash": self.content_hash,
"vitality": self.vitality,
"last_accessed": self.last_accessed,
}
@classmethod

View File

@@ -2,31 +2,63 @@
Computes semantic similarity between archive entries and creates
bidirectional links, forming the holographic graph structure.
Supports pluggable embedding backends for true semantic search.
Falls back to Jaccard token similarity when no backend is available.
"""
from __future__ import annotations
from typing import Optional
from typing import Optional, TYPE_CHECKING
from nexus.mnemosyne.entry import ArchiveEntry
if TYPE_CHECKING:
from nexus.mnemosyne.embeddings import EmbeddingBackend
class HolographicLinker:
"""Links archive entries via semantic similarity.
Phase 1 uses simple keyword overlap as the similarity metric.
Phase 2 will integrate ChromaDB embeddings from MemPalace.
With an embedding backend: cosine similarity on vectors.
Without: Jaccard similarity on token sets (legacy fallback).
"""
def __init__(self, similarity_threshold: float = 0.15):
def __init__(
self,
similarity_threshold: float = 0.15,
embedding_backend: Optional["EmbeddingBackend"] = None,
):
self.threshold = similarity_threshold
self._backend = embedding_backend
self._embed_cache: dict[str, list[float]] = {}
@property
def using_embeddings(self) -> bool:
return self._backend is not None
def _get_embedding(self, entry: ArchiveEntry) -> list[float]:
"""Get or compute cached embedding for an entry."""
if entry.id in self._embed_cache:
return self._embed_cache[entry.id]
text = f"{entry.title} {entry.content}"
vec = self._backend.embed(text) if self._backend else []
if vec:
self._embed_cache[entry.id] = vec
return vec
def compute_similarity(self, a: ArchiveEntry, b: ArchiveEntry) -> float:
"""Compute similarity score between two entries.
Returns float in [0, 1]. Phase 1: Jaccard similarity on
combined title+content tokens. Phase 2: cosine similarity
on ChromaDB embeddings.
Returns float in [0, 1]. Uses embedding cosine similarity if
a backend is configured, otherwise falls back to Jaccard.
"""
if self._backend:
vec_a = self._get_embedding(a)
vec_b = self._get_embedding(b)
if vec_a and vec_b:
return self._backend.similarity(vec_a, vec_b)
# Fallback: Jaccard on tokens
tokens_a = self._tokenize(f"{a.title} {a.content}")
tokens_b = self._tokenize(f"{b.title} {b.content}")
if not tokens_a or not tokens_b:
@@ -35,11 +67,10 @@ class HolographicLinker:
union = tokens_a | tokens_b
return len(intersection) / len(union)
def find_links(self, entry: ArchiveEntry, candidates: list[ArchiveEntry]) -> list[tuple[str, float]]:
"""Find entries worth linking to.
Returns list of (entry_id, similarity_score) tuples above threshold.
"""
def find_links(
self, entry: ArchiveEntry, candidates: list[ArchiveEntry]
) -> list[tuple[str, float]]:
"""Find entries worth linking to. Returns (entry_id, score) tuples."""
results = []
for candidate in candidates:
if candidate.id == entry.id:
@@ -58,16 +89,18 @@ class HolographicLinker:
if eid not in entry.links:
entry.links.append(eid)
new_links += 1
# Bidirectional
for c in candidates:
if c.id == eid and entry.id not in c.links:
c.links.append(entry.id)
return new_links
def clear_cache(self):
"""Clear embedding cache (call after bulk entry changes)."""
self._embed_cache.clear()
@staticmethod
def _tokenize(text: str) -> set[str]:
"""Simple whitespace + punctuation tokenizer."""
import re
tokens = set(re.findall(r"\w+", text.lower()))
# Remove very short tokens
return {t for t in tokens if len(t) > 2}

View File

@@ -0,0 +1,176 @@
"""Tests for MnemosyneArchive.consolidate() — duplicate/near-duplicate merging."""
import tempfile
from pathlib import Path
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
def _archive(tmp: str) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=Path(tmp) / "archive.json", auto_embed=False)
def test_consolidate_exact_duplicate_removed():
"""Two entries with identical content_hash are merged; only one survives."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Hello world", content="Exactly the same content", topics=["a"])
# Manually add a second entry with the same hash to simulate a duplicate
e2 = ArchiveEntry(title="Hello world", content="Exactly the same content", topics=["b"])
# Bypass dedup guard so we can test consolidate() rather than add()
archive._entries[e2.id] = e2
archive._save()
assert archive.count == 2
merges = archive.consolidate(dry_run=False)
assert len(merges) == 1
assert merges[0]["reason"] == "exact_hash"
assert merges[0]["score"] == 1.0
assert archive.count == 1
def test_consolidate_keeps_older_entry():
"""The older entry (earlier created_at) is kept, the newer is removed."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Hello world", content="Same content here", topics=[])
e2 = ArchiveEntry(title="Hello world", content="Same content here", topics=[])
# Make e2 clearly newer
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
merges = archive.consolidate(dry_run=False)
assert len(merges) == 1
assert merges[0]["kept"] == e1.id
assert merges[0]["removed"] == e2.id
def test_consolidate_merges_topics():
"""Topics from the removed entry are merged (unioned) into the kept entry."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Memory item", content="Shared content body", topics=["alpha"])
e2 = ArchiveEntry(title="Memory item", content="Shared content body", topics=["beta", "gamma"])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor is not None
topic_lower = {t.lower() for t in survivor.topics}
assert "alpha" in topic_lower
assert "beta" in topic_lower
assert "gamma" in topic_lower
def test_consolidate_merges_metadata():
"""Metadata from the removed entry is merged into the kept entry; kept values win."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ArchiveEntry(
title="Shared", content="Identical body here", topics=[], metadata={"k1": "v1", "shared": "kept"}
)
archive._entries[e1.id] = e1
e2 = ArchiveEntry(
title="Shared", content="Identical body here", topics=[], metadata={"k2": "v2", "shared": "removed"}
)
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor.metadata["k1"] == "v1"
assert survivor.metadata["k2"] == "v2"
assert survivor.metadata["shared"] == "kept" # kept entry wins
def test_consolidate_dry_run_no_mutation():
"""Dry-run mode returns merge plan but does not alter the archive."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
ingest_event(archive, title="Same", content="Identical content to dedup", topics=[])
e2 = ArchiveEntry(title="Same", content="Identical content to dedup", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
merges = archive.consolidate(dry_run=True)
assert len(merges) == 1
assert merges[0]["dry_run"] is True
# Archive must be unchanged
assert archive.count == 2
def test_consolidate_no_duplicates():
"""When no duplicates exist, consolidate returns an empty list."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
ingest_event(archive, title="Unique A", content="This is completely unique content for A")
ingest_event(archive, title="Unique B", content="Totally different words here for B")
merges = archive.consolidate(threshold=0.9)
assert merges == []
def test_consolidate_transfers_links():
"""Links from the removed entry are inherited by the kept entry."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
# Create a third entry to act as a link target
target = ingest_event(archive, title="Target", content="The link target entry", topics=[])
e1 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[], links=[target.id])
archive._entries[e1.id] = e1
target.links.append(e1.id)
e2 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor is not None
assert target.id in survivor.links
def test_consolidate_near_duplicate_semantic():
"""Near-duplicate entries above the similarity threshold are merged."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
# Entries with very high Jaccard overlap
text_a = "python automation scripting building tools workflows"
text_b = "python automation scripting building tools workflows tasks"
e1 = ArchiveEntry(title="Automator", content=text_a, topics=[])
e2 = ArchiveEntry(title="Automator", content=text_b, topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e1.id] = e1
archive._entries[e2.id] = e2
archive._save()
# Use a low threshold to ensure these very similar entries match
merges = archive.consolidate(threshold=0.7, dry_run=False)
assert len(merges) >= 1
assert merges[0]["reason"] == "semantic_similarity"
def test_consolidate_persists_after_reload():
"""After consolidation, the reduced archive survives a save/reload cycle."""
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(archive, title="Persist test", content="Body to dedup and persist", topics=[])
e2 = ArchiveEntry(title="Persist test", content="Body to dedup and persist", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
assert archive.count == 1
reloaded = MnemosyneArchive(archive_path=path, auto_embed=False)
assert reloaded.count == 1

View File

@@ -0,0 +1,112 @@
"""Tests for the embedding backend module."""
from __future__ import annotations
import math
import pytest
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
TfidfEmbeddingBackend,
cosine_similarity,
get_embedding_backend,
)
class TestCosineSimilarity:
def test_identical_vectors(self):
a = [1.0, 2.0, 3.0]
assert abs(cosine_similarity(a, a) - 1.0) < 1e-9
def test_orthogonal_vectors(self):
a = [1.0, 0.0]
b = [0.0, 1.0]
assert abs(cosine_similarity(a, b) - 0.0) < 1e-9
def test_opposite_vectors(self):
a = [1.0, 0.0]
b = [-1.0, 0.0]
assert abs(cosine_similarity(a, b) - (-1.0)) < 1e-9
def test_zero_vector(self):
a = [0.0, 0.0]
b = [1.0, 2.0]
assert cosine_similarity(a, b) == 0.0
def test_dimension_mismatch(self):
with pytest.raises(ValueError):
cosine_similarity([1.0], [1.0, 2.0])
class TestTfidfEmbeddingBackend:
def test_basic_embed(self):
backend = TfidfEmbeddingBackend()
vec = backend.embed("hello world test")
assert len(vec) > 0
assert all(isinstance(v, float) for v in vec)
def test_empty_text(self):
backend = TfidfEmbeddingBackend()
vec = backend.embed("")
assert vec == []
def test_identical_texts_similar(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("the cat sat on the mat")
v2 = backend.embed("the cat sat on the mat")
sim = backend.similarity(v1, v2)
assert sim > 0.99
def test_different_texts_less_similar(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("python programming language")
v2 = backend.embed("cooking recipes italian food")
sim = backend.similarity(v1, v2)
assert sim < 0.5
def test_related_texts_more_similar(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("machine learning neural networks")
v2 = backend.embed("deep learning artificial neural nets")
v3 = backend.embed("baking bread sourdough recipe")
sim_related = backend.similarity(v1, v2)
sim_unrelated = backend.similarity(v1, v3)
assert sim_related > sim_unrelated
def test_name(self):
backend = TfidfEmbeddingBackend()
assert "TF-IDF" in backend.name
def test_dimension_grows(self):
backend = TfidfEmbeddingBackend()
d1 = backend.dimension
backend.embed("new unique tokens here")
d2 = backend.dimension
assert d2 > d1
def test_padding_different_lengths(self):
backend = TfidfEmbeddingBackend()
v1 = backend.embed("short")
v2 = backend.embed("this is a much longer text with many more tokens")
# Should not raise despite different lengths
sim = backend.similarity(v1, v2)
assert 0.0 <= sim <= 1.0
class TestGetEmbeddingBackend:
def test_tfidf_preferred(self):
backend = get_embedding_backend(prefer="tfidf")
assert isinstance(backend, TfidfEmbeddingBackend)
def test_auto_returns_something(self):
backend = get_embedding_backend()
assert isinstance(backend, EmbeddingBackend)
def test_ollama_unavailable_falls_back(self):
# Should fall back to TF-IDF when Ollama is unreachable
backend = get_embedding_backend(prefer="ollama", ollama_url="http://localhost:1")
# If it raises, the test fails — it should fall back
# But with prefer="ollama" it raises if unavailable
# So we test without prefer:
backend = get_embedding_backend(ollama_url="http://localhost:1")
assert isinstance(backend, TfidfEmbeddingBackend)

View File

@@ -0,0 +1,278 @@
"""Tests for Mnemosyne memory decay system."""
import json
import os
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
"""Create a fresh archive for testing."""
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path)
@pytest.fixture
def populated_archive(tmp_path):
"""Create an archive with some entries."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path)
arch.add(ArchiveEntry(title="Fresh Entry", content="Just added", topics=["test"]))
arch.add(ArchiveEntry(title="Old Entry", content="Been here a while", topics=["test"]))
arch.add(ArchiveEntry(title="Another Entry", content="Some content", topics=["other"]))
return arch
class TestVitalityFields:
"""Test that vitality fields exist on entries."""
def test_entry_has_vitality_default(self):
entry = ArchiveEntry(title="Test", content="Content")
assert entry.vitality == 1.0
def test_entry_has_last_accessed_default(self):
entry = ArchiveEntry(title="Test", content="Content")
assert entry.last_accessed is None
def test_entry_roundtrip_with_vitality(self):
entry = ArchiveEntry(
title="Test", content="Content",
vitality=0.75,
last_accessed="2024-01-01T00:00:00+00:00"
)
d = entry.to_dict()
assert d["vitality"] == 0.75
assert d["last_accessed"] == "2024-01-01T00:00:00+00:00"
restored = ArchiveEntry.from_dict(d)
assert restored.vitality == 0.75
assert restored.last_accessed == "2024-01-01T00:00:00+00:00"
class TestTouch:
"""Test touch() access recording and vitality boost."""
def test_touch_sets_last_accessed(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
assert entry.last_accessed is None
touched = archive.touch(entry.id)
assert touched.last_accessed is not None
def test_touch_boosts_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content", vitality=0.5))
touched = archive.touch(entry.id)
# Boost = 0.1 * (1 - 0.5) = 0.05, so vitality should be ~0.55
# (assuming no time decay in test — instantaneous)
assert touched.vitality > 0.5
assert touched.vitality <= 1.0
def test_touch_diminishing_returns(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content", vitality=0.9))
touched = archive.touch(entry.id)
# Boost = 0.1 * (1 - 0.9) = 0.01, so vitality should be ~0.91
assert touched.vitality < 0.92
assert touched.vitality > 0.9
def test_touch_never_exceeds_one(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content", vitality=0.99))
for _ in range(10):
entry = archive.touch(entry.id)
assert entry.vitality <= 1.0
def test_touch_missing_entry_raises(self, archive):
with pytest.raises(KeyError):
archive.touch("nonexistent-id")
def test_touch_persists(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
archive.touch(entry.id)
# Reload archive
arch2 = MnemosyneArchive(archive_path=archive._path)
loaded = arch2.get(entry.id)
assert loaded.last_accessed is not None
class TestGetVitality:
"""Test get_vitality() status reporting."""
def test_get_vitality_basic(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
status = archive.get_vitality(entry.id)
assert status["entry_id"] == entry.id
assert status["title"] == "Test"
assert 0.0 <= status["vitality"] <= 1.0
assert status["age_days"] == 0
def test_get_vitality_missing_raises(self, archive):
with pytest.raises(KeyError):
archive.get_vitality("nonexistent-id")
class TestComputeVitality:
"""Test the decay computation."""
def test_new_entry_full_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
v = archive._compute_vitality(entry)
assert v == 1.0
def test_recently_touched_high_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
archive.touch(entry.id)
v = archive._compute_vitality(entry)
assert v > 0.99 # Should be essentially 1.0 since just touched
def test_old_entry_decays(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
# Simulate old access — set last_accessed to 60 days ago
old_date = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
entry.last_accessed = old_date
entry.vitality = 1.0
archive._save()
v = archive._compute_vitality(entry)
# 60 days with 30-day half-life: v = 1.0 * 0.5^(60/30) = 0.25
assert v < 0.3
assert v > 0.2
def test_very_old_entry_nearly_zero(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
old_date = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
entry.last_accessed = old_date
entry.vitality = 1.0
archive._save()
v = archive._compute_vitality(entry)
# 365 days / 30 half-life = ~12 half-lives -> ~0.0002
assert v < 0.01
class TestFading:
"""Test fading() — most neglected entries."""
def test_fading_returns_lowest_first(self, populated_archive):
entries = list(populated_archive._entries.values())
# Make one entry very old
old_entry = entries[1]
old_date = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
old_entry.last_accessed = old_date
old_entry.vitality = 1.0
populated_archive._save()
fading = populated_archive.fading(limit=3)
assert len(fading) <= 3
# First result should be the oldest
assert fading[0]["entry_id"] == old_entry.id
# Should be in ascending order
for i in range(len(fading) - 1):
assert fading[i]["vitality"] <= fading[i + 1]["vitality"]
def test_fading_empty_archive(self, archive):
fading = archive.fading()
assert fading == []
def test_fading_limit(self, populated_archive):
fading = populated_archive.fading(limit=2)
assert len(fading) == 2
class TestVibrant:
"""Test vibrant() — most alive entries."""
def test_vibrant_returns_highest_first(self, populated_archive):
entries = list(populated_archive._entries.values())
# Make one entry very old
old_entry = entries[1]
old_date = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
old_entry.last_accessed = old_date
old_entry.vitality = 1.0
populated_archive._save()
vibrant = populated_archive.vibrant(limit=3)
# Should be in descending order
for i in range(len(vibrant) - 1):
assert vibrant[i]["vitality"] >= vibrant[i + 1]["vitality"]
# First result should NOT be the old entry
assert vibrant[0]["entry_id"] != old_entry.id
def test_vibrant_empty_archive(self, archive):
vibrant = archive.vibrant()
assert vibrant == []
class TestApplyDecay:
"""Test apply_decay() bulk decay operation."""
def test_apply_decay_returns_stats(self, populated_archive):
result = populated_archive.apply_decay()
assert result["total_entries"] == 3
assert "decayed_count" in result
assert "avg_vitality" in result
assert "fading_count" in result
assert "vibrant_count" in result
def test_apply_decay_persists(self, populated_archive):
populated_archive.apply_decay()
# Reload
arch2 = MnemosyneArchive(archive_path=populated_archive._path)
result2 = arch2.apply_decay()
# Should show same entries
assert result2["total_entries"] == 3
def test_apply_decay_on_empty(self, archive):
result = archive.apply_decay()
assert result["total_entries"] == 0
assert result["avg_vitality"] == 0.0
class TestStatsVitality:
"""Test that stats() includes vitality summary."""
def test_stats_includes_vitality(self, populated_archive):
stats = populated_archive.stats()
assert "avg_vitality" in stats
assert "fading_count" in stats
assert "vibrant_count" in stats
assert 0.0 <= stats["avg_vitality"] <= 1.0
def test_stats_empty_archive(self, archive):
stats = archive.stats()
assert stats["avg_vitality"] == 0.0
assert stats["fading_count"] == 0
assert stats["vibrant_count"] == 0
class TestDecayLifecycle:
"""Integration test: full lifecycle from creation to fading."""
def test_entry_lifecycle(self, archive):
# Create
entry = archive.add(ArchiveEntry(title="Memory", content="A thing happened"))
assert entry.vitality == 1.0
# Touch a few times
for _ in range(5):
archive.touch(entry.id)
# Check it's vibrant
vibrant = archive.vibrant(limit=1)
assert len(vibrant) == 1
assert vibrant[0]["entry_id"] == entry.id
# Simulate time passing
entry.last_accessed = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
entry.vitality = 0.8
archive._save()
# Apply decay
result = archive.apply_decay()
assert result["total_entries"] == 1
# Check it's now fading
fading = archive.fading(limit=1)
assert fading[0]["entry_id"] == entry.id
assert fading[0]["vitality"] < 0.5