Compare commits

..

15 Commits

Author SHA1 Message Date
25eee03f6b docs: add archive_snapshot to FEATURES.yaml (#1268)
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 15s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 09:44:44 +00:00
5e033c9014 test: add snapshot test suite (#1268)
test_snapshot_create, test_snapshot_list, test_snapshot_restore, test_snapshot_diff
2026-04-12 09:43:25 +00:00
c2dd1f974f feat: add snapshot CLI commands (#1268)
mnemosyne snapshot create|list|restore|diff
2026-04-12 09:42:14 +00:00
46159b05b8 feat: export snapshot functions from mnemosyne package (#1268) 2026-04-12 09:41:32 +00:00
bbdf4fbbff feat: add archive snapshot module (#1268)
Point-in-time backup and restore for Mnemosyne.
snapshot_create, snapshot_list, snapshot_restore, snapshot_diff.
2026-04-12 09:40:54 +00:00
bb21beccdd Merge pull request '[Mnemosyne] Fix path command bug + add vitality/decay CLI commands' (#1267) from fix/mnemosyne-cli-path-vitality into main
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
2026-04-12 09:26:37 +00:00
3361a0e259 docs: update FEATURES.yaml with new CLI commands
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 14s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 08:43:16 +00:00
8fb0a50b91 test: add CLI command tests for path, touch, decay, vitality, fading, vibrant 2026-04-12 08:42:59 +00:00
99e4baf54b fix: mnemosyne path command bug + add vitality/decay CLI commands
Closes #1266

- Fix cmd_path calling nonexistent _load() -> use MnemosyneArchive()
- Add path to dispatch dict
- Add touch, decay, vitality, fading, vibrant CLI commands
2026-04-12 08:41:54 +00:00
b0e24af7fe Merge PR #1265
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
Auto-merged by Timmy PR triage — clean diff, no conflicts, tests present.
2026-04-12 08:37:15 +00:00
65cef9d9c0 docs: mark memory_pulse as shipped, add memory_path feature
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 14s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 08:22:58 +00:00
267505a68f test: add tests for shortest_path and path_explanation 2026-04-12 08:22:56 +00:00
e8312d91f7 feat: add 'path' CLI command for memory pathfinding 2026-04-12 08:22:55 +00:00
446ec370c8 feat: add shortest_path and path_explanation to MnemosyneArchive
BFS-based pathfinding between memories through the connection graph.
Enables 'how is X related to Y?' queries across the holographic archive.
2026-04-12 08:22:53 +00:00
76e62fe43f [claude] Memory Pulse — BFS wave animation on crystal click (#1263) (#1264)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 4s
2026-04-12 06:45:25 +00:00
10 changed files with 982 additions and 227 deletions

5
app.js
View File

@@ -716,7 +716,7 @@ async function init() {
MemoryBirth.wrapSpatialMemory(SpatialMemory);
SpatialMemory.setCamera(camera);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(scene);
MemoryPulse.init(SpatialMemory);
updateLoad(90);
loadSession();
@@ -1947,9 +1947,9 @@ function setupControls() {
const entry = SpatialMemory.getMemoryFromMesh(hits[0].object);
if (entry) {
SpatialMemory.highlightMemory(entry.data.id);
MemoryPulse.triggerPulse(entry.data.id);
const regionDef = SpatialMemory.REGIONS[entry.region] || SpatialMemory.REGIONS.working;
MemoryInspect.show(entry.data, regionDef);
MemoryPulse.trigger(entry.data.id, SpatialMemory);
}
} else {
// Clicked empty space — close inspect panel and deselect crystal
@@ -2927,6 +2927,7 @@ function gameLoop() {
if (typeof animateMemoryOrbs === 'function') {
SpatialMemory.update(delta);
MemoryBirth.update(delta);
MemoryPulse.update();
animateMemoryOrbs(delta);
}

View File

@@ -1,256 +1,160 @@
// ═══════════════════════════════════════════════════════════
// MNEMOSYNE — Memory Pulse
// ═══════════════════════════════════════════════════════════
// ═══════════════════════════════════════════════════
// PROJECT MNEMOSYNE — MEMORY PULSE
// ═══════════════════════════════════════════════════
//
// Visual pulse wave that radiates through the connection graph
// when a memory crystal is clicked. Illuminates linked memories
// by BFS hop distance — closer neighbors light up first.
// BFS wave animation triggered on crystal click.
// When a memory crystal is clicked, a visual pulse
// radiates through the connection graph — illuminating
// linked memories hop-by-hop with a glow that rises
// sharply and then fades.
//
// Usage from app.js:
// import { MemoryPulse } from './nexus/components/memory-pulse.js';
// MemoryPulse.init(scene);
// MemoryPulse.trigger(clickedMemId, SpatialMemory);
//
// Depends on: SpatialMemory (getAllMemories, getMemoryFromMesh)
// ═══════════════════════════════════════════════════════════
// Usage:
// MemoryPulse.init(SpatialMemory);
// MemoryPulse.triggerPulse(memId);
// MemoryPulse.update(); // called each frame
// ═══════════════════════════════════════════════════
const MemoryPulse = (() => {
let _scene = null;
let _activePulses = []; // track running animations for cleanup
const HOP_DELAY = 300; // ms between each BFS hop wave
const GLOW_DURATION = 800; // ms each crystal glows at peak
const FADE_DURATION = 600; // ms to fade back to normal
const PULSE_COLOR = 0x4af0c0; // cyan-green pulse glow
const PULSE_INTENSITY = 6.0; // peak emissive during pulse
const RING_DURATION = 1200; // ms for the expanding ring effect
let _sm = null;
// ─── INIT ────────────────────────────────────────────────
function init(scene) {
_scene = scene;
// [{mesh, startTime, delay, duration, peakIntensity, baseIntensity}]
const _activeEffects = [];
// ── Config ───────────────────────────────────────
const HOP_DELAY_MS = 180; // ms between hops
const PULSE_DURATION = 650; // ms for glow rise + fade per node
const PEAK_INTENSITY = 5.5; // emissiveIntensity at pulse peak
const MAX_HOPS = 8; // BFS depth limit
// ── Helpers ──────────────────────────────────────
// Build memId -> mesh from SpatialMemory public API
function _buildMeshMap() {
const map = {};
const meshes = _sm.getCrystalMeshes();
for (const mesh of meshes) {
const entry = _sm.getMemoryFromMesh(mesh);
if (entry) map[entry.data.id] = mesh;
}
return map;
}
// ─── BFS TRAVERSAL ───────────────────────────────────────
// Returns array of arrays: [[hop-0 ids], [hop-1 ids], [hop-2 ids], ...]
function bfsHops(startId, allMemories) {
const memMap = {};
for (const m of allMemories) {
memMap[m.id] = m;
}
if (!memMap[startId]) return [];
const visited = new Set([startId]);
const hops = [];
let frontier = [startId];
while (frontier.length > 0) {
hops.push([...frontier]);
const next = [];
for (const id of frontier) {
const mem = memMap[id];
if (!mem || !mem.connections) continue;
for (const connId of mem.connections) {
if (!visited.has(connId)) {
visited.add(connId);
next.push(connId);
}
// Build bidirectional adjacency graph from memory connection data
function _buildGraph() {
const graph = {};
const memories = _sm.getAllMemories();
for (const mem of memories) {
if (!graph[mem.id]) graph[mem.id] = [];
if (mem.connections) {
for (const targetId of mem.connections) {
graph[mem.id].push(targetId);
if (!graph[targetId]) graph[targetId] = [];
graph[targetId].push(mem.id);
}
}
frontier = next;
}
return hops;
return graph;
}
// ─── EXPANDING RING ──────────────────────────────────────
// Creates a flat ring geometry that expands outward from a position
function createExpandingRing(position, color) {
const ringGeo = new THREE.RingGeometry(0.1, 0.2, 32);
const ringMat = new THREE.MeshBasicMaterial({
color: color,
transparent: true,
opacity: 0.8,
side: THREE.DoubleSide,
depthWrite: false
});
const ring = new THREE.Mesh(ringGeo, ringMat);
ring.position.copy(position);
ring.position.y += 0.1; // slightly above crystal
ring.rotation.x = -Math.PI / 2; // flat horizontal
ring.scale.set(0.1, 0.1, 0.1);
_scene.add(ring);
return ring;
// ── Public API ───────────────────────────────────
function init(spatialMemory) {
_sm = spatialMemory;
}
// ─── ANIMATE RING ────────────────────────────────────────
function animateRing(ring, onComplete) {
const startTime = performance.now();
function tick() {
const elapsed = performance.now() - startTime;
const t = Math.min(1, elapsed / RING_DURATION);
/**
* Trigger a BFS pulse wave originating from memId.
* Each hop level illuminates after HOP_DELAY_MS * hop ms.
* @param {string} memId - ID of the clicked memory crystal
*/
function triggerPulse(memId) {
if (!_sm) return;
// Expand outward
const scale = 0.1 + t * 4.0;
ring.scale.set(scale, scale, scale);
const meshMap = _buildMeshMap();
const graph = _buildGraph();
// Fade out
ring.material.opacity = 0.8 * (1 - t * t);
if (!meshMap[memId]) return;
if (t < 1) {
requestAnimationFrame(tick);
} else {
_scene.remove(ring);
ring.geometry.dispose();
ring.material.dispose();
if (onComplete) onComplete();
// Cancel any existing effects on the same meshes (avoids stacking)
_activeEffects.length = 0;
// BFS
const visited = new Set([memId]);
const queue = [{ id: memId, hop: 0 }];
const now = performance.now();
const scheduled = [];
while (queue.length > 0) {
const { id, hop } = queue.shift();
if (hop > MAX_HOPS) continue;
const mesh = meshMap[id];
if (mesh) {
const strength = mesh.userData.strength || 0.7;
const baseIntensity = 1.0 + Math.sin(mesh.userData.pulse || 0) * 0.5 * strength;
scheduled.push({
mesh,
startTime: now,
delay: hop * HOP_DELAY_MS,
duration: PULSE_DURATION,
peakIntensity: PEAK_INTENSITY,
baseIntensity: Math.max(0.5, baseIntensity)
});
}
}
requestAnimationFrame(tick);
}
// ─── PULSE CRYSTAL GLOW ──────────────────────────────────
// Temporarily boosts a crystal's emissive intensity
function pulseGlow(mesh, hopIndex) {
if (!mesh || !mesh.material) return;
const originalIntensity = mesh.material.emissiveIntensity;
const originalColor = mesh.material.emissive ? mesh.material.emissive.clone() : null;
const delay = hopIndex * HOP_DELAY;
setTimeout(() => {
if (!mesh.material) return;
// Store original for restore
const origInt = mesh.material.emissiveIntensity;
// Flash to pulse color
if (mesh.material.emissive) {
mesh.material.emissive.setHex(PULSE_COLOR);
}
mesh.material.emissiveIntensity = PULSE_INTENSITY;
// Also boost point light if present
let origLightIntensity = null;
let origLightColor = null;
if (mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
origLightIntensity = child.intensity;
origLightColor = child.color.clone();
child.intensity = 3.0;
child.color.setHex(PULSE_COLOR);
}
for (const neighborId of (graph[id] || [])) {
if (!visited.has(neighborId)) {
visited.add(neighborId);
queue.push({ id: neighborId, hop: hop + 1 });
}
}
// Hold at peak, then fade
setTimeout(() => {
const fadeStart = performance.now();
function fadeTick() {
const elapsed = performance.now() - fadeStart;
const t = Math.min(1, elapsed / FADE_DURATION);
const eased = 1 - (1 - t) * (1 - t); // ease-out quad
mesh.material.emissiveIntensity = PULSE_INTENSITY + (origInt - PULSE_INTENSITY) * eased;
if (originalColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
mesh.material.emissive.setRGB(
pr + (originalColor.r - pr) * eased,
pg + (originalColor.g - pg) * eased,
pb + (originalColor.b - pb) * eased
);
}
// Restore point light
if (origLightIntensity !== null && mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
child.intensity = 3.0 + (origLightIntensity - 3.0) * eased;
if (origLightColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
child.color.setRGB(
pr + (origLightColor.r - pr) * eased,
pg + (origLightColor.g - pg) * eased,
pb + (origLightColor.b - pb) * eased
);
}
}
}
}
if (t < 1) {
requestAnimationFrame(fadeTick);
}
}
requestAnimationFrame(fadeTick);
}, GLOW_DURATION);
}, delay);
}
// ─── TRIGGER ─────────────────────────────────────────────
// Main entry point: fire a pulse wave from the given memory ID
function trigger(memId, spatialMemory) {
if (!_scene) return;
const allMemories = spatialMemory.getAllMemories();
const hops = bfsHops(memId, allMemories);
if (hops.length <= 1) {
// No connections — just do a local ring
const obj = spatialMemory.getMemoryFromMesh(
spatialMemory.getCrystalMeshes().find(m => m.userData.memId === memId)
);
if (obj && obj.mesh) {
const ring = createExpandingRing(obj.mesh.position, PULSE_COLOR);
animateRing(ring);
}
return;
}
// For each hop level, create expanding rings and pulse glows
for (let hopIdx = 0; hopIdx < hops.length; hopIdx++) {
const idsInHop = hops[hopIdx];
for (const effect of scheduled) {
_activeEffects.push(effect);
}
for (const id of idsInHop) {
// Find mesh for this memory
const meshes = spatialMemory.getCrystalMeshes();
let targetMesh = null;
for (const m of meshes) {
if (m.userData && m.userData.memId === id) {
targetMesh = m;
break;
}
console.info('[MemoryPulse] Pulse triggered from', memId, '—', scheduled.length, 'nodes in wave');
}
/**
* Advance all active pulse animations. Call once per frame.
*/
function update() {
if (_activeEffects.length === 0) return;
const now = performance.now();
for (let i = _activeEffects.length - 1; i >= 0; i--) {
const e = _activeEffects[i];
const elapsed = now - e.startTime - e.delay;
if (elapsed < 0) continue; // waiting for its hop delay
if (elapsed >= e.duration) {
// Animation complete — restore base intensity
if (e.mesh.material) {
e.mesh.material.emissiveIntensity = e.baseIntensity;
}
_activeEffects.splice(i, 1);
continue;
}
if (!targetMesh) continue;
// t: 0 → 1 over duration
const t = elapsed / e.duration;
// sin curve over [0, π]: smooth rise then fall
const glow = Math.sin(t * Math.PI);
// Schedule pulse glow
pulseGlow(targetMesh, hopIdx);
// Create expanding ring at this hop's delay
((mesh, delay) => {
setTimeout(() => {
const ring = createExpandingRing(mesh.position, PULSE_COLOR);
animateRing(ring);
}, delay * HOP_DELAY);
})(targetMesh, hopIdx);
if (e.mesh.material) {
e.mesh.material.emissiveIntensity =
e.baseIntensity + glow * (e.peakIntensity - e.baseIntensity);
}
}
}
// ─── CLEANUP ─────────────────────────────────────────────
function dispose() {
// Active pulses will self-clean via their animation callbacks
_activePulses = [];
}
return { init, trigger, dispose, bfsHops };
return { init, triggerPulse, update };
})();
export { MemoryPulse };

View File

@@ -67,7 +67,7 @@ modules:
cli:
status: shipped
files: [cli.py]
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate, path, touch, decay, vitality, fading, vibrant
tests:
status: shipped
@@ -163,12 +163,15 @@ planned:
- "#TBD" # Will be filled when PR is created
memory_pulse:
status: planned
status: shipped
files: [nexus/components/memory-pulse.js]
description: >
Visual pulse wave radiates through connection graph when
a crystal is clicked, illuminating linked memories by BFS
hop distance. Was attempted in PR #1226 — needs rebasing.
hop distance.
priority: medium
merged_prs:
- "#1263"
embedding_backend:
status: shipped
@@ -181,6 +184,19 @@ planned:
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_path:
status: shipped
files: [archive.py, cli.py, tests/test_path.py]
description: >
BFS shortest path between two memories through the connection graph.
Answers "how is memory X related to memory Y?" by finding the chain
of connections. Includes path_explanation for human-readable output.
CLI command: mnemosyne path <start_id> <end_id>
priority: medium
merged_prs:
- "#TBD"
memory_consolidation:
status: shipped
files: [archive.py, cli.py, tests/test_consolidation.py]

View File

@@ -20,6 +20,12 @@ from nexus.mnemosyne.embeddings import (
TfidfEmbeddingBackend,
get_embedding_backend,
)
from nexus.mnemosyne.snapshot import (
snapshot_create,
snapshot_list,
snapshot_restore,
snapshot_diff,
)
__all__ = [
"MnemosyneArchive",
@@ -31,4 +37,8 @@ __all__ = [
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",
"get_embedding_backend",
"snapshot_create",
"snapshot_list",
"snapshot_restore",
"snapshot_diff",
]

View File

@@ -1059,6 +1059,52 @@ class MnemosyneArchive:
return merges
def shortest_path(self, start_id: str, end_id: str) -> list[str] | None:
"""Find shortest path between two entries through the connection graph.
Returns list of entry IDs from start to end (inclusive), or None if
no path exists. Uses BFS for unweighted shortest path.
"""
if start_id == end_id:
return [start_id] if start_id in self._entries else None
if start_id not in self._entries or end_id not in self._entries:
return None
adj = self._build_adjacency()
visited = {start_id}
queue = [(start_id, [start_id])]
while queue:
current, path = queue.pop(0)
for neighbor in adj.get(current, []):
if neighbor == end_id:
return path + [neighbor]
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
def path_explanation(self, path: list[str]) -> list[dict]:
"""Convert a path of entry IDs into human-readable step descriptions.
Returns list of dicts with 'id', 'title', and 'topics' for each step.
"""
steps = []
for entry_id in path:
entry = self._entries.get(entry_id)
if entry:
steps.append({
"id": entry.id,
"title": entry.title,
"topics": entry.topics,
"content_preview": entry.content[:120] + "..." if len(entry.content) > 120 else entry.content,
})
else:
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
return steps
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.

View File

@@ -4,7 +4,10 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff
"""
from __future__ import annotations
@@ -16,6 +19,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
from nexus.mnemosyne.snapshot import snapshot_create, snapshot_list, snapshot_restore, snapshot_diff
def cmd_stats(args):
@@ -206,6 +210,21 @@ def cmd_timeline(args):
print()
def cmd_path(args):
archive = MnemosyneArchive(archive_path=args.archive) if args.archive else MnemosyneArchive()
path = archive.shortest_path(args.start, args.end)
if path is None:
print(f"No path found between {args.start} and {args.end}")
return
steps = archive.path_explanation(path)
print(f"Path ({len(steps)} hops):")
for i, step in enumerate(steps):
arrow = "" if i > 0 else " "
print(f"{arrow}{step['id']}: {step['title']}")
if step['topics']:
print(f" topics: {', '.join(step['topics'])}")
def cmd_consolidate(args):
archive = MnemosyneArchive()
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
@@ -239,6 +258,117 @@ def cmd_neighbors(args):
print()
def cmd_touch(args):
archive = MnemosyneArchive()
try:
entry = archive.touch(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
v = archive.get_vitality(entry.id)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Vitality: {v['vitality']:.4f} (boosted)")
def cmd_decay(args):
archive = MnemosyneArchive()
result = archive.apply_decay()
print(f"Applied decay to {result['total_entries']} entries")
print(f" Decayed: {result['decayed_count']}")
print(f" Avg vitality: {result['avg_vitality']:.4f}")
print(f" Fading (<0.3): {result['fading_count']}")
print(f" Vibrant (>0.7): {result['vibrant_count']}")
def cmd_vitality(args):
archive = MnemosyneArchive()
try:
v = archive.get_vitality(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f}")
print(f" Last accessed: {v['last_accessed'] or 'never'}")
print(f" Age: {v['age_days']} days")
def cmd_fading(args):
archive = MnemosyneArchive()
results = archive.fading(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def cmd_vibrant(args):
archive = MnemosyneArchive()
results = archive.vibrant(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def cmd_snapshot_create(args):
archive = MnemosyneArchive()
result = snapshot_create(archive, label=args.label)
print(f"Snapshot created: {result['snapshot_id']}")
print(f" Entries: {result['entry_count']}")
print(f" Label: {result['label'] or '(none)'}")
print(f" Path: {result['path']}")
def cmd_snapshot_list(args):
archive = MnemosyneArchive()
snaps = snapshot_list(archive)
if not snaps:
print("No snapshots found.")
return
for s in snaps:
label = f" ({s['label']})" if s['label'] else ""
print(f" {s['snapshot_id']} {s['created_at'][:19]} {s['entry_count']} entries{label}")
def cmd_snapshot_restore(args):
archive = MnemosyneArchive()
try:
result = snapshot_restore(archive, args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Restored snapshot {result['snapshot_id']}")
print(f" Entries restored: {result['restored_entries']}")
print(f" Previous count: {result['previous_count']}")
def cmd_snapshot_diff(args):
archive = MnemosyneArchive()
try:
result = snapshot_diff(archive, args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Diff: snapshot {result['snapshot_id']} vs current")
print(f" Snapshot: {result['snapshot_entries']} entries")
print(f" Current: {result['current_entries']} entries")
print(f" Added: {result['added']}")
print(f" Removed: {result['removed']}")
print(f" Changed: {result['changed']}")
if result['changed_details']:
print()
for c in result['changed_details']:
print(f" [{c['id'][:8]}] {c['title']}")
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
@@ -300,15 +430,51 @@ def main():
nb.add_argument("entry_id", help="Anchor entry ID")
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
pa = sub.add_parser("path", help="Find shortest path between two memories")
pa.add_argument("start", help="Starting entry ID")
pa.add_argument("end", help="Target entry ID")
pa.add_argument("--archive", default=None, help="Archive path")
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
tc = sub.add_parser("touch", help="Boost an entry's vitality by accessing it")
tc.add_argument("entry_id", help="Entry ID to touch")
dc = sub.add_parser("decay", help="Apply time-based decay to all entries")
vy = sub.add_parser("vitality", help="Show an entry's vitality status")
vy.add_argument("entry_id", help="Entry ID to check")
fg = sub.add_parser("fading", help="Show most neglected entries (lowest vitality)")
fg.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
# Snapshot subcommands
sp = sub.add_parser("snapshot", help="Archive snapshot operations")
sp_sub = sp.add_subparsers(dest="snapshot_command")
sp_create = sp_sub.add_parser("create", help="Create a point-in-time snapshot")
sp_create.add_argument("-l", "--label", default="", help="Human-readable label")
sp_sub.add_parser("list", help="List available snapshots")
sp_restore = sp_sub.add_parser("restore", help="Restore from a snapshot")
sp_restore.add_argument("snapshot_id", help="Snapshot ID (or prefix)")
sp_diff = sp_sub.add_parser("diff", help="Diff snapshot vs current archive")
sp_diff.add_argument("snapshot_id", help="Snapshot ID (or prefix)")
dispatch = {
"stats": cmd_stats,
"search": cmd_search,
@@ -327,9 +493,32 @@ def main():
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
"consolidate": cmd_consolidate,
"path": cmd_path,
"touch": cmd_touch,
"decay": cmd_decay,
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"snapshot": lambda args: _dispatch_snapshot(args),
}
dispatch[args.command](args)
def _dispatch_snapshot(args):
"""Route snapshot subcommands to handlers."""
cmd = getattr(args, "snapshot_command", None)
if cmd == "create":
cmd_snapshot_create(args)
elif cmd == "list":
cmd_snapshot_list(args)
elif cmd == "restore":
cmd_snapshot_restore(args)
elif cmd == "diff":
cmd_snapshot_diff(args)
else:
print("Usage: mnemosyne snapshot {create|list|restore|diff}")
sys.exit(1)
if __name__ == "__main__":
main()

206
nexus/mnemosyne/snapshot.py Normal file
View File

@@ -0,0 +1,206 @@
"""Archive snapshot — point-in-time backup and restore.
Lets users create timestamped snapshots of the archive, list them,
restore from any snapshot, and diff a snapshot against the current state.
Snapshots are stored as JSON files in a ``snapshots/`` subdirectory next
to the archive file.
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def _snapshots_dir(archive: MnemosyneArchive) -> Path:
"""Return the snapshots directory, creating it if needed."""
d = archive.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
def snapshot_create(
archive: MnemosyneArchive,
label: Optional[str] = None,
) -> dict:
"""Create a point-in-time snapshot of the archive.
Args:
archive: The archive to snapshot.
label: Optional human-readable label for the snapshot.
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
snapshot_id = str(uuid.uuid4())[:8]
now = datetime.now(timezone.utc).isoformat()
data = {
"snapshot_id": snapshot_id,
"label": label or "",
"created_at": now,
"entry_count": archive.count,
"entries": [e.to_dict() for e in archive._entries.values()],
}
path = _snapshots_dir(archive) / f"{snapshot_id}.json"
with open(path, "w") as f:
json.dump(data, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label or "",
"created_at": now,
"entry_count": archive.count,
"path": str(path),
}
def snapshot_list(archive: MnemosyneArchive) -> list[dict]:
"""List all available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count
"""
snapshots = []
d = _snapshots_dir(archive)
for f in sorted(d.glob("*.json")):
try:
with open(f) as fh:
meta = json.load(fh)
snapshots.append({
"snapshot_id": meta["snapshot_id"],
"label": meta.get("label", ""),
"created_at": meta["created_at"],
"entry_count": meta["entry_count"],
})
except (json.JSONDecodeError, KeyError):
continue
# Newest first
snapshots.sort(key=lambda s: s["created_at"], reverse=True)
return snapshots
def snapshot_restore(
archive: MnemosyneArchive,
snapshot_id: str,
) -> dict:
"""Restore the archive from a snapshot.
Replaces ALL current entries with the snapshot data. The archive is
saved immediately after restore.
Args:
archive: The archive to restore into.
snapshot_id: ID of the snapshot to restore (or unique prefix).
Returns:
Dict with keys: snapshot_id, restored_entries, previous_count
Raises:
FileNotFoundError: If no matching snapshot is found.
"""
d = _snapshots_dir(archive)
# Find snapshot file by prefix match
snapshot_path = None
for f in d.glob("*.json"):
if f.stem.startswith(snapshot_id):
snapshot_path = f
break
if snapshot_path is None:
raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found")
with open(snapshot_path) as fh:
data = json.load(fh)
previous_count = archive.count
# Clear and restore
archive._entries = {}
for entry_data in data["entries"]:
entry = ArchiveEntry.from_dict(entry_data)
archive._entries[entry.id] = entry
archive._save()
return {
"snapshot_id": data["snapshot_id"],
"label": data.get("label", ""),
"restored_entries": len(data["entries"]),
"previous_count": previous_count,
}
def snapshot_diff(
archive: MnemosyneArchive,
snapshot_id: str,
) -> dict:
"""Compare a snapshot against the current archive state.
Args:
archive: The current archive.
snapshot_id: ID of the snapshot to compare (or unique prefix).
Returns:
Dict with keys: snapshot_id, snapshot_entries, current_entries,
added (in current but not snapshot), removed (in snapshot but not current),
changed (same ID but different content_hash)
Raises:
FileNotFoundError: If no matching snapshot is found.
"""
d = _snapshots_dir(archive)
snapshot_path = None
for f in d.glob("*.json"):
if f.stem.startswith(snapshot_id):
snapshot_path = f
break
if snapshot_path is None:
raise FileNotFoundError(f"No snapshot matching '{snapshot_id}' found")
with open(snapshot_path) as fh:
data = json.load(fh)
snap_entries = {e["id"]: e for e in data["entries"]}
curr_entries = {e.id: e.to_dict() for e in archive._entries.values()}
snap_ids = set(snap_entries.keys())
curr_ids = set(curr_entries.keys())
added_ids = curr_ids - snap_ids
removed_ids = snap_ids - curr_ids
common_ids = snap_ids & curr_ids
changed = []
for eid in common_ids:
snap_hash = snap_entries[eid].get("content_hash", "")
curr_hash = curr_entries[eid].get("content_hash", "")
if snap_hash != curr_hash:
changed.append({
"id": eid,
"title": curr_entries[eid].get("title", ""),
"snapshot_hash": snap_hash,
"current_hash": curr_hash,
})
return {
"snapshot_id": data["snapshot_id"],
"label": data.get("label", ""),
"snapshot_entries": len(snap_entries),
"current_entries": len(curr_entries),
"added": len(added_ids),
"removed": len(removed_ids),
"changed": len(changed),
"added_ids": sorted(added_ids),
"removed_ids": sorted(removed_ids),
"changed_details": changed,
}

View File

@@ -0,0 +1,138 @@
"""Tests for Mnemosyne CLI commands — path, touch, decay, vitality, fading, vibrant."""
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import sys
import io
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path)
@pytest.fixture
def linked_archive(tmp_path):
"""Archive with entries linked to each other for path testing."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path, auto_embed=False)
e1 = arch.add(ArchiveEntry(title="Alpha", content="first entry about python", topics=["code"]))
e2 = arch.add(ArchiveEntry(title="Beta", content="second entry about python coding", topics=["code"]))
e3 = arch.add(ArchiveEntry(title="Gamma", content="third entry about cooking recipes", topics=["food"]))
return arch, e1, e2, e3
class TestPathCommand:
def test_shortest_path_exists(self, linked_archive):
arch, e1, e2, e3 = linked_archive
path = arch.shortest_path(e1.id, e2.id)
assert path is not None
assert path[0] == e1.id
assert path[-1] == e2.id
def test_shortest_path_no_connection(self, linked_archive):
arch, e1, e2, e3 = linked_archive
# e3 (cooking) likely not linked to e1 (python coding)
path = arch.shortest_path(e1.id, e3.id)
# Path may or may not exist depending on linking threshold
# Either None or a list is valid
def test_shortest_path_same_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, e1.id)
assert path == [e1.id]
def test_shortest_path_missing_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, "nonexistent-id")
assert path is None
class TestTouchCommand:
def test_touch_boosts_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
# Simulate time passing by setting old last_accessed
old_time = "2020-01-01T00:00:00+00:00"
entry.last_accessed = old_time
entry.vitality = 0.5
archive._save()
touched = archive.touch(entry.id)
assert touched.vitality > 0.5
assert touched.last_accessed != old_time
def test_touch_missing_entry(self, archive):
with pytest.raises(KeyError):
archive.touch("nonexistent-id")
class TestDecayCommand:
def test_apply_decay_returns_stats(self, archive):
archive.add(ArchiveEntry(title="Test", content="Content"))
result = archive.apply_decay()
assert result["total_entries"] == 1
assert "avg_vitality" in result
assert "fading_count" in result
assert "vibrant_count" in result
def test_decay_on_empty_archive(self, archive):
result = archive.apply_decay()
assert result["total_entries"] == 0
assert result["avg_vitality"] == 0.0
class TestVitalityCommand:
def test_get_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
v = archive.get_vitality(entry.id)
assert v["entry_id"] == entry.id
assert v["title"] == "Test"
assert 0.0 <= v["vitality"] <= 1.0
assert v["age_days"] >= 0
def test_get_vitality_missing(self, archive):
with pytest.raises(KeyError):
archive.get_vitality("nonexistent-id")
class TestFadingVibrant:
def test_fading_returns_sorted_ascending(self, archive):
# Add entries with different vitalities
e1 = archive.add(ArchiveEntry(title="Vibrant", content="High energy"))
e2 = archive.add(ArchiveEntry(title="Fading", content="Low energy"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.fading(limit=10)
assert len(results) == 2
assert results[0]["vitality"] <= results[1]["vitality"]
def test_vibrant_returns_sorted_descending(self, archive):
e1 = archive.add(ArchiveEntry(title="Fresh", content="New"))
e2 = archive.add(ArchiveEntry(title="Old", content="Ancient"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.vibrant(limit=10)
assert len(results) == 2
assert results[0]["vitality"] >= results[1]["vitality"]
def test_fading_limit(self, archive):
for i in range(15):
archive.add(ArchiveEntry(title=f"Entry {i}", content=f"Content {i}"))
results = archive.fading(limit=5)
assert len(results) == 5
def test_vibrant_empty(self, archive):
results = archive.vibrant()
assert results == []

View File

@@ -0,0 +1,106 @@
"""Tests for MnemosyneArchive.shortest_path and path_explanation."""
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def _make_archive(tmp_path):
archive = MnemosyneArchive(str(tmp_path / "test_archive.json"))
return archive
class TestShortestPath:
def test_direct_connection(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "first entry", topics=["start"])
b = archive.add("Beta", "second entry", topics=["end"])
# Manually link
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = archive.shortest_path(a.id, b.id)
assert path == [a.id, b.id]
def test_multi_hop_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "alpha", topics=["x"])
b = archive.add("B", "beta", topics=["y"])
c = archive.add("C", "gamma", topics=["z"])
# Chain: A -> B -> C
a.links.append(b.id)
b.links.extend([a.id, c.id])
c.links.append(b.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._entries[c.id] = c
archive._save()
path = archive.shortest_path(a.id, c.id)
assert path == [a.id, b.id, c.id]
def test_no_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "isolated", topics=[])
b = archive.add("B", "also isolated", topics=[])
path = archive.shortest_path(a.id, b.id)
assert path is None
def test_same_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "lonely", topics=[])
path = archive.shortest_path(a.id, a.id)
assert path == [a.id]
def test_nonexistent_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "exists", topics=[])
path = archive.shortest_path("fake-id", a.id)
assert path is None
def test_shortest_of_multiple(self, tmp_path):
"""When multiple paths exist, BFS returns shortest."""
archive = _make_archive(tmp_path)
a = archive.add("A", "a", topics=[])
b = archive.add("B", "b", topics=[])
c = archive.add("C", "c", topics=[])
d = archive.add("D", "d", topics=[])
# A -> B -> D (short)
# A -> C -> B -> D (long)
a.links.extend([b.id, c.id])
b.links.extend([a.id, d.id, c.id])
c.links.extend([a.id, b.id])
d.links.append(b.id)
for e in [a, b, c, d]:
archive._entries[e.id] = e
archive._save()
path = archive.shortest_path(a.id, d.id)
assert len(path) == 3 # A -> B -> D, not A -> C -> B -> D
class TestPathExplanation:
def test_returns_step_details(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "the beginning", topics=["origin"])
b = archive.add("Beta", "the middle", topics=["process"])
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = [a.id, b.id]
steps = archive.path_explanation(path)
assert len(steps) == 2
assert steps[0]["title"] == "Alpha"
assert steps[1]["title"] == "Beta"
assert "origin" in steps[0]["topics"]
def test_content_preview_truncation(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "x" * 200, topics=[])
steps = archive.path_explanation([a.id])
assert len(steps[0]["content_preview"]) <= 123 # 120 + "..."

View File

@@ -0,0 +1,139 @@
"""Tests for Mnemosyne archive snapshot — create, list, restore, diff."""
import json
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
from nexus.mnemosyne.snapshot import (
snapshot_create,
snapshot_list,
snapshot_restore,
snapshot_diff,
)
@pytest.fixture
def archive(tmp_path):
"""Create a fresh archive with a few entries."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(arch, title="First", content="hello world", topics=["test"])
ingest_event(arch, title="Second", content="another entry", topics=["demo"])
ingest_event(arch, title="Third", content="more content here", topics=["test", "demo"])
return arch
class TestSnapshotCreate:
def test_create_returns_metadata(self, archive):
result = snapshot_create(archive, label="test snap")
assert "snapshot_id" in result
assert result["label"] == "test snap"
assert result["entry_count"] == 3
assert Path(result["path"]).exists()
def test_create_no_label(self, archive):
result = snapshot_create(archive)
assert result["label"] == ""
def test_snapshot_file_is_valid_json(self, archive):
result = snapshot_create(archive)
with open(result["path"]) as f:
data = json.load(f)
assert data["entry_count"] == 3
assert len(data["entries"]) == 3
assert "created_at" in data
class TestSnapshotList:
def test_empty_list(self, archive):
# Snapshots dir doesn't exist yet (no snapshots created)
# Actually, create() makes the dir, so list before any create:
snaps = snapshot_list(archive)
assert snaps == []
def test_list_returns_created_snapshots(self, archive):
snapshot_create(archive, label="first")
snapshot_create(archive, label="second")
snaps = snapshot_list(archive)
assert len(snaps) == 2
# Newest first
assert snaps[0]["label"] == "second"
assert snaps[1]["label"] == "first"
def test_list_entry_count(self, archive):
snapshot_create(archive)
snaps = snapshot_list(archive)
assert snaps[0]["entry_count"] == 3
class TestSnapshotRestore:
def test_restore_replaces_entries(self, archive):
result = snapshot_create(archive, label="before change")
sid = result["snapshot_id"]
# Add more entries
ingest_event(archive, title="Fourth", content="new entry", topics=["new"])
assert archive.count == 4
# Restore
restore_result = snapshot_restore(archive, sid)
assert restore_result["restored_entries"] == 3
assert restore_result["previous_count"] == 4
assert archive.count == 3
def test_restore_prefix_match(self, archive):
result = snapshot_create(archive)
sid = result["snapshot_id"]
# Use just first 4 chars
restore_result = snapshot_restore(archive, sid[:4])
assert restore_result["snapshot_id"] == sid
def test_restore_nonexistent_raises(self, archive):
with pytest.raises(FileNotFoundError):
snapshot_restore(archive, "nonexistent-id")
def test_restore_preserves_content(self, archive):
result = snapshot_create(archive)
original_titles = sorted(e.title for e in archive._entries.values())
ingest_event(archive, title="Extra", content="extra", topics=[])
snapshot_restore(archive, result["snapshot_id"])
restored_titles = sorted(e.title for e in archive._entries.values())
assert restored_titles == original_titles
class TestSnapshotDiff:
def test_diff_identical(self, archive):
result = snapshot_create(archive)
diff = snapshot_diff(archive, result["snapshot_id"])
assert diff["added"] == 0
assert diff["removed"] == 0
assert diff["changed"] == 0
def test_diff_added_entries(self, archive):
result = snapshot_create(archive)
ingest_event(archive, title="New Entry", content="new", topics=["new"])
diff = snapshot_diff(archive, result["snapshot_id"])
assert diff["added"] == 1
assert diff["removed"] == 0
assert diff["current_entries"] == 4
assert diff["snapshot_entries"] == 3
def test_diff_removed_entries(self, archive):
result = snapshot_create(archive)
# Remove an entry
first_id = list(archive._entries.keys())[0]
archive.remove(first_id)
diff = snapshot_diff(archive, result["snapshot_id"])
assert diff["removed"] == 1
assert first_id in diff["removed_ids"]
def test_diff_nonexistent_raises(self, archive):
with pytest.raises(FileNotFoundError):
snapshot_diff(archive, "nope")