Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
d6b7d9137b feat: wire MemoryPulse to crystal click handler
Import and initialize MemoryPulse in app.js. Trigger pulse wave
on every crystal click alongside MemoryInspect panel.

Closes #1263
2026-04-12 02:40:36 -04:00
Alexander Whitestone
8d7930de31 feat: add memory-pulse.js — BFS wave animation engine
Implements the memory_pulse feature from FEATURES.yaml.
Visual pulse wave radiates through connection graph when a
crystal is clicked, illuminating linked memories by BFS hop
distance with expanding ring effects.

Closes #1263
2026-04-12 02:40:03 -04:00
9 changed files with 224 additions and 1218 deletions

5
app.js
View File

@@ -716,7 +716,7 @@ async function init() {
MemoryBirth.wrapSpatialMemory(SpatialMemory);
SpatialMemory.setCamera(camera);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(SpatialMemory);
MemoryPulse.init(scene);
updateLoad(90);
loadSession();
@@ -1947,9 +1947,9 @@ function setupControls() {
const entry = SpatialMemory.getMemoryFromMesh(hits[0].object);
if (entry) {
SpatialMemory.highlightMemory(entry.data.id);
MemoryPulse.triggerPulse(entry.data.id);
const regionDef = SpatialMemory.REGIONS[entry.region] || SpatialMemory.REGIONS.working;
MemoryInspect.show(entry.data, regionDef);
MemoryPulse.trigger(entry.data.id, SpatialMemory);
}
} else {
// Clicked empty space — close inspect panel and deselect crystal
@@ -2927,7 +2927,6 @@ function gameLoop() {
if (typeof animateMemoryOrbs === 'function') {
SpatialMemory.update(delta);
MemoryBirth.update(delta);
MemoryPulse.update();
animateMemoryOrbs(delta);
}

View File

@@ -1,160 +1,256 @@
// ═══════════════════════════════════════════════════
// PROJECT MNEMOSYNE — MEMORY PULSE
// ═══════════════════════════════════════════════════
// ═══════════════════════════════════════════════════════════
// MNEMOSYNE — Memory Pulse
// ═══════════════════════════════════════════════════════════
//
// BFS wave animation triggered on crystal click.
// When a memory crystal is clicked, a visual pulse
// radiates through the connection graph — illuminating
// linked memories hop-by-hop with a glow that rises
// sharply and then fades.
// Visual pulse wave that radiates through the connection graph
// when a memory crystal is clicked. Illuminates linked memories
// by BFS hop distance — closer neighbors light up first.
//
// Usage:
// MemoryPulse.init(SpatialMemory);
// MemoryPulse.triggerPulse(memId);
// MemoryPulse.update(); // called each frame
// ═══════════════════════════════════════════════════
// Usage from app.js:
// import { MemoryPulse } from './nexus/components/memory-pulse.js';
// MemoryPulse.init(scene);
// MemoryPulse.trigger(clickedMemId, SpatialMemory);
//
// Depends on: SpatialMemory (getAllMemories, getMemoryFromMesh)
// ═══════════════════════════════════════════════════════════
const MemoryPulse = (() => {
let _scene = null;
let _activePulses = []; // track running animations for cleanup
let _sm = null;
const HOP_DELAY = 300; // ms between each BFS hop wave
const GLOW_DURATION = 800; // ms each crystal glows at peak
const FADE_DURATION = 600; // ms to fade back to normal
const PULSE_COLOR = 0x4af0c0; // cyan-green pulse glow
const PULSE_INTENSITY = 6.0; // peak emissive during pulse
const RING_DURATION = 1200; // ms for the expanding ring effect
// [{mesh, startTime, delay, duration, peakIntensity, baseIntensity}]
const _activeEffects = [];
// ── Config ───────────────────────────────────────
const HOP_DELAY_MS = 180; // ms between hops
const PULSE_DURATION = 650; // ms for glow rise + fade per node
const PEAK_INTENSITY = 5.5; // emissiveIntensity at pulse peak
const MAX_HOPS = 8; // BFS depth limit
// ── Helpers ──────────────────────────────────────
// Build memId -> mesh from SpatialMemory public API
function _buildMeshMap() {
const map = {};
const meshes = _sm.getCrystalMeshes();
for (const mesh of meshes) {
const entry = _sm.getMemoryFromMesh(mesh);
if (entry) map[entry.data.id] = mesh;
}
return map;
// ─── INIT ────────────────────────────────────────────────
function init(scene) {
_scene = scene;
}
// Build bidirectional adjacency graph from memory connection data
function _buildGraph() {
const graph = {};
const memories = _sm.getAllMemories();
for (const mem of memories) {
if (!graph[mem.id]) graph[mem.id] = [];
if (mem.connections) {
for (const targetId of mem.connections) {
graph[mem.id].push(targetId);
if (!graph[targetId]) graph[targetId] = [];
graph[targetId].push(mem.id);
// ─── BFS TRAVERSAL ───────────────────────────────────────
// Returns array of arrays: [[hop-0 ids], [hop-1 ids], [hop-2 ids], ...]
function bfsHops(startId, allMemories) {
const memMap = {};
for (const m of allMemories) {
memMap[m.id] = m;
}
if (!memMap[startId]) return [];
const visited = new Set([startId]);
const hops = [];
let frontier = [startId];
while (frontier.length > 0) {
hops.push([...frontier]);
const next = [];
for (const id of frontier) {
const mem = memMap[id];
if (!mem || !mem.connections) continue;
for (const connId of mem.connections) {
if (!visited.has(connId)) {
visited.add(connId);
next.push(connId);
}
}
}
frontier = next;
}
return graph;
return hops;
}
// ── Public API ───────────────────────────────────
function init(spatialMemory) {
_sm = spatialMemory;
// ─── EXPANDING RING ──────────────────────────────────────
// Creates a flat ring geometry that expands outward from a position
function createExpandingRing(position, color) {
const ringGeo = new THREE.RingGeometry(0.1, 0.2, 32);
const ringMat = new THREE.MeshBasicMaterial({
color: color,
transparent: true,
opacity: 0.8,
side: THREE.DoubleSide,
depthWrite: false
});
const ring = new THREE.Mesh(ringGeo, ringMat);
ring.position.copy(position);
ring.position.y += 0.1; // slightly above crystal
ring.rotation.x = -Math.PI / 2; // flat horizontal
ring.scale.set(0.1, 0.1, 0.1);
_scene.add(ring);
return ring;
}
/**
* Trigger a BFS pulse wave originating from memId.
* Each hop level illuminates after HOP_DELAY_MS * hop ms.
* @param {string} memId - ID of the clicked memory crystal
*/
function triggerPulse(memId) {
if (!_sm) return;
// ─── ANIMATE RING ────────────────────────────────────────
function animateRing(ring, onComplete) {
const startTime = performance.now();
function tick() {
const elapsed = performance.now() - startTime;
const t = Math.min(1, elapsed / RING_DURATION);
const meshMap = _buildMeshMap();
const graph = _buildGraph();
// Expand outward
const scale = 0.1 + t * 4.0;
ring.scale.set(scale, scale, scale);
if (!meshMap[memId]) return;
// Fade out
ring.material.opacity = 0.8 * (1 - t * t);
// Cancel any existing effects on the same meshes (avoids stacking)
_activeEffects.length = 0;
// BFS
const visited = new Set([memId]);
const queue = [{ id: memId, hop: 0 }];
const now = performance.now();
const scheduled = [];
while (queue.length > 0) {
const { id, hop } = queue.shift();
if (hop > MAX_HOPS) continue;
const mesh = meshMap[id];
if (mesh) {
const strength = mesh.userData.strength || 0.7;
const baseIntensity = 1.0 + Math.sin(mesh.userData.pulse || 0) * 0.5 * strength;
scheduled.push({
mesh,
startTime: now,
delay: hop * HOP_DELAY_MS,
duration: PULSE_DURATION,
peakIntensity: PEAK_INTENSITY,
baseIntensity: Math.max(0.5, baseIntensity)
});
if (t < 1) {
requestAnimationFrame(tick);
} else {
_scene.remove(ring);
ring.geometry.dispose();
ring.material.dispose();
if (onComplete) onComplete();
}
}
requestAnimationFrame(tick);
}
for (const neighborId of (graph[id] || [])) {
if (!visited.has(neighborId)) {
visited.add(neighborId);
queue.push({ id: neighborId, hop: hop + 1 });
// ─── PULSE CRYSTAL GLOW ──────────────────────────────────
// Temporarily boosts a crystal's emissive intensity
function pulseGlow(mesh, hopIndex) {
if (!mesh || !mesh.material) return;
const originalIntensity = mesh.material.emissiveIntensity;
const originalColor = mesh.material.emissive ? mesh.material.emissive.clone() : null;
const delay = hopIndex * HOP_DELAY;
setTimeout(() => {
if (!mesh.material) return;
// Store original for restore
const origInt = mesh.material.emissiveIntensity;
// Flash to pulse color
if (mesh.material.emissive) {
mesh.material.emissive.setHex(PULSE_COLOR);
}
mesh.material.emissiveIntensity = PULSE_INTENSITY;
// Also boost point light if present
let origLightIntensity = null;
let origLightColor = null;
if (mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
origLightIntensity = child.intensity;
origLightColor = child.color.clone();
child.intensity = 3.0;
child.color.setHex(PULSE_COLOR);
}
}
}
}
for (const effect of scheduled) {
_activeEffects.push(effect);
}
// Hold at peak, then fade
setTimeout(() => {
const fadeStart = performance.now();
function fadeTick() {
const elapsed = performance.now() - fadeStart;
const t = Math.min(1, elapsed / FADE_DURATION);
const eased = 1 - (1 - t) * (1 - t); // ease-out quad
console.info('[MemoryPulse] Pulse triggered from', memId, '—', scheduled.length, 'nodes in wave');
}
mesh.material.emissiveIntensity = PULSE_INTENSITY + (origInt - PULSE_INTENSITY) * eased;
/**
* Advance all active pulse animations. Call once per frame.
*/
function update() {
if (_activeEffects.length === 0) return;
if (originalColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
mesh.material.emissive.setRGB(
pr + (originalColor.r - pr) * eased,
pg + (originalColor.g - pg) * eased,
pb + (originalColor.b - pb) * eased
);
}
const now = performance.now();
// Restore point light
if (origLightIntensity !== null && mesh.children) {
for (const child of mesh.children) {
if (child.isPointLight) {
child.intensity = 3.0 + (origLightIntensity - 3.0) * eased;
if (origLightColor) {
const pr = ((PULSE_COLOR >> 16) & 0xff) / 255;
const pg = ((PULSE_COLOR >> 8) & 0xff) / 255;
const pb = (PULSE_COLOR & 0xff) / 255;
child.color.setRGB(
pr + (origLightColor.r - pr) * eased,
pg + (origLightColor.g - pg) * eased,
pb + (origLightColor.b - pb) * eased
);
}
}
}
}
for (let i = _activeEffects.length - 1; i >= 0; i--) {
const e = _activeEffects[i];
const elapsed = now - e.startTime - e.delay;
if (elapsed < 0) continue; // waiting for its hop delay
if (elapsed >= e.duration) {
// Animation complete — restore base intensity
if (e.mesh.material) {
e.mesh.material.emissiveIntensity = e.baseIntensity;
if (t < 1) {
requestAnimationFrame(fadeTick);
}
}
_activeEffects.splice(i, 1);
continue;
requestAnimationFrame(fadeTick);
}, GLOW_DURATION);
}, delay);
}
// ─── TRIGGER ─────────────────────────────────────────────
// Main entry point: fire a pulse wave from the given memory ID
function trigger(memId, spatialMemory) {
if (!_scene) return;
const allMemories = spatialMemory.getAllMemories();
const hops = bfsHops(memId, allMemories);
if (hops.length <= 1) {
// No connections — just do a local ring
const obj = spatialMemory.getMemoryFromMesh(
spatialMemory.getCrystalMeshes().find(m => m.userData.memId === memId)
);
if (obj && obj.mesh) {
const ring = createExpandingRing(obj.mesh.position, PULSE_COLOR);
animateRing(ring);
}
return;
}
// t: 0 → 1 over duration
const t = elapsed / e.duration;
// sin curve over [0, π]: smooth rise then fall
const glow = Math.sin(t * Math.PI);
// For each hop level, create expanding rings and pulse glows
for (let hopIdx = 0; hopIdx < hops.length; hopIdx++) {
const idsInHop = hops[hopIdx];
if (e.mesh.material) {
e.mesh.material.emissiveIntensity =
e.baseIntensity + glow * (e.peakIntensity - e.baseIntensity);
for (const id of idsInHop) {
// Find mesh for this memory
const meshes = spatialMemory.getCrystalMeshes();
let targetMesh = null;
for (const m of meshes) {
if (m.userData && m.userData.memId === id) {
targetMesh = m;
break;
}
}
if (!targetMesh) continue;
// Schedule pulse glow
pulseGlow(targetMesh, hopIdx);
// Create expanding ring at this hop's delay
((mesh, delay) => {
setTimeout(() => {
const ring = createExpandingRing(mesh.position, PULSE_COLOR);
animateRing(ring);
}, delay * HOP_DELAY);
})(targetMesh, hopIdx);
}
}
}
return { init, triggerPulse, update };
// ─── CLEANUP ─────────────────────────────────────────────
function dispose() {
// Active pulses will self-clean via their animation callbacks
_activePulses = [];
}
return { init, trigger, dispose, bfsHops };
})();
export { MemoryPulse };

View File

@@ -67,7 +67,7 @@ modules:
cli:
status: shipped
files: [cli.py]
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate, path, touch, decay, vitality, fading, vibrant
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate
tests:
status: shipped
@@ -163,15 +163,12 @@ planned:
- "#TBD" # Will be filled when PR is created
memory_pulse:
status: shipped
files: [nexus/components/memory-pulse.js]
status: planned
description: >
Visual pulse wave radiates through connection graph when
a crystal is clicked, illuminating linked memories by BFS
hop distance.
hop distance. Was attempted in PR #1226 — needs rebasing.
priority: medium
merged_prs:
- "#1263"
embedding_backend:
status: shipped
@@ -184,19 +181,6 @@ planned:
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_path:
status: shipped
files: [archive.py, cli.py, tests/test_path.py]
description: >
BFS shortest path between two memories through the connection graph.
Answers "how is memory X related to memory Y?" by finding the chain
of connections. Includes path_explanation for human-readable output.
CLI command: mnemosyne path <start_id> <end_id>
priority: medium
merged_prs:
- "#TBD"
memory_consolidation:
status: shipped
files: [archive.py, cli.py, tests/test_consolidation.py]

View File

@@ -1059,221 +1059,6 @@ class MnemosyneArchive:
return merges
def shortest_path(self, start_id: str, end_id: str) -> list[str] | None:
"""Find shortest path between two entries through the connection graph.
Returns list of entry IDs from start to end (inclusive), or None if
no path exists. Uses BFS for unweighted shortest path.
"""
if start_id == end_id:
return [start_id] if start_id in self._entries else None
if start_id not in self._entries or end_id not in self._entries:
return None
adj = self._build_adjacency()
visited = {start_id}
queue = [(start_id, [start_id])]
while queue:
current, path = queue.pop(0)
for neighbor in adj.get(current, []):
if neighbor == end_id:
return path + [neighbor]
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
def path_explanation(self, path: list[str]) -> list[dict]:
"""Convert a path of entry IDs into human-readable step descriptions.
Returns list of dicts with 'id', 'title', and 'topics' for each step.
"""
steps = []
for entry_id in path:
entry = self._entries.get(entry_id)
if entry:
steps.append({
"id": entry.id,
"title": entry.title,
"topics": entry.topics,
"content_preview": entry.content[:120] + "..." if len(entry.content) > 120 else entry.content,
})
else:
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
return steps
# ─── Snapshot / Backup ────────────────────────────────────
def _snapshot_dir(self) -> Path:
"""Return (and create) the snapshots directory next to the archive."""
d = self.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
@staticmethod
def _snapshot_filename(timestamp: str, label: str) -> str:
"""Build a deterministic snapshot filename."""
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
return f"{timestamp}_{safe_label}.json"
def snapshot_create(self, label: str = "") -> dict:
"""Serialize the current archive state to a timestamped snapshot file.
Args:
label: Human-readable label for the snapshot (optional).
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = self._snapshot_filename(timestamp, label)
snapshot_id = filename[:-5] # strip .json
snap_path = self._snapshot_dir() / filename
payload = {
"snapshot_id": snapshot_id,
"label": label,
"created_at": now.isoformat(),
"entry_count": len(self._entries),
"archive_path": str(self.path),
"entries": [e.to_dict() for e in self._entries.values()],
}
with open(snap_path, "w") as f:
json.dump(payload, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label,
"created_at": payload["created_at"],
"entry_count": payload["entry_count"],
"path": str(snap_path),
}
def snapshot_list(self) -> list[dict]:
"""List available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
"""
snap_dir = self._snapshot_dir()
snapshots = []
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
try:
with open(snap_path) as f:
data = json.load(f)
snapshots.append({
"snapshot_id": data.get("snapshot_id", snap_path.stem),
"label": data.get("label", ""),
"created_at": data.get("created_at", ""),
"entry_count": data.get("entry_count", len(data.get("entries", []))),
"path": str(snap_path),
})
except (json.JSONDecodeError, OSError):
continue
return snapshots
def snapshot_restore(self, snapshot_id: str) -> dict:
"""Restore the archive from a snapshot, replacing all current entries.
Args:
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
Returns:
Dict with keys: snapshot_id, restored_count, previous_count
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
previous_count = len(self._entries)
self._entries = {}
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
self._save()
return {
"snapshot_id": snapshot_id,
"restored_count": len(self._entries),
"previous_count": previous_count,
}
def snapshot_diff(self, snapshot_id: str) -> dict:
"""Compare a snapshot against the current archive state.
Args:
snapshot_id: The snapshot_id to compare against current state.
Returns:
Dict with keys:
- snapshot_id: str
- added: list of {id, title} — in current, not in snapshot
- removed: list of {id, title} — in snapshot, not in current
- modified: list of {id, title, snapshot_hash, current_hash}
- unchanged: int — count of identical entries
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
snap_entries: dict[str, dict] = {}
for entry_data in data.get("entries", []):
snap_entries[entry_data["id"]] = entry_data
current_ids = set(self._entries.keys())
snap_ids = set(snap_entries.keys())
added = []
for eid in current_ids - snap_ids:
e = self._entries[eid]
added.append({"id": e.id, "title": e.title})
removed = []
for eid in snap_ids - current_ids:
snap_e = snap_entries[eid]
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
modified = []
unchanged = 0
for eid in current_ids & snap_ids:
current_hash = self._entries[eid].content_hash
snap_hash = snap_entries[eid].get("content_hash")
if current_hash != snap_hash:
modified.append({
"id": eid,
"title": self._entries[eid].title,
"snapshot_hash": snap_hash,
"current_hash": current_hash,
})
else:
unchanged += 1
return {
"snapshot_id": snapshot_id,
"added": sorted(added, key=lambda x: x["title"]),
"removed": sorted(removed, key=lambda x: x["title"]),
"modified": sorted(modified, key=lambda x: x["title"]),
"unchanged": unchanged,
}
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
@@ -1308,88 +1093,3 @@ class MnemosyneArchive:
self._save()
return total_links
# ─── Discovery ──────────────────────────────────────────────
def discover(
self,
count: int = 5,
prefer_fading: bool = True,
topic: Optional[str] = None,
) -> list[dict]:
"""Serendipitous entry discovery — surface forgotten knowledge.
Selects entries probabilistically, weighting toward fading (low vitality)
entries when prefer_fading=True, or toward vibrant entries when False.
Optionally filter by topic.
Touches selected entries to boost their vitality, preventing the same
entries from being repeatedly surfaced.
Args:
count: Number of entries to discover.
prefer_fading: If True, weight toward neglected entries. If False,
weight toward vibrant entries.
topic: Optional topic filter — only discover entries with this tag.
Returns:
List of dicts with keys: entry_id, title, content_preview, topics,
vitality, age_days, last_accessed
"""
import random
candidates = list(self._entries.values())
# Filter by topic if specified
if topic:
topic_lower = topic.lower()
candidates = [
e for e in candidates
if topic_lower in [t.lower() for t in e.topics]
]
if not candidates:
return []
# Compute vitality for each candidate
scored = []
for entry in candidates:
v = self._compute_vitality(entry)
scored.append((entry, v))
# Build selection weights
if prefer_fading:
# Lower vitality = higher weight. Invert and normalize.
weights = [max(0.01, 1.0 - v) for _, v in scored]
else:
# Higher vitality = higher weight
weights = [max(0.01, v) for _, v in scored]
# Sample without replacement
k = min(count, len(scored))
selected_indices = random.choices(range(len(scored)), weights=weights, k=k)
# Deduplicate while preserving order
seen = set()
unique_indices = []
for idx in selected_indices:
if idx not in seen:
seen.add(idx)
unique_indices.append(idx)
results = []
for idx in unique_indices:
entry, v = scored[idx]
# Touch to boost vitality
self.touch(entry.id)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
results.append({
"entry_id": entry.id,
"title": entry.title,
"content_preview": entry.content[:200] + "..." if len(entry.content) > 200 else entry.content,
"topics": entry.topics,
"vitality": round(v, 4),
"age_days": age_days,
"last_accessed": entry.last_accessed,
})
return results

View File

@@ -4,11 +4,7 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff
mnemosyne discover [-n COUNT] [-t TOPIC] [--vibrant]
mnemosyne timeline, mnemosyne neighbors
"""
from __future__ import annotations
@@ -210,21 +206,6 @@ def cmd_timeline(args):
print()
def cmd_path(args):
archive = MnemosyneArchive(archive_path=args.archive) if args.archive else MnemosyneArchive()
path = archive.shortest_path(args.start, args.end)
if path is None:
print(f"No path found between {args.start} and {args.end}")
return
steps = archive.path_explanation(path)
print(f"Path ({len(steps)} hops):")
for i, step in enumerate(steps):
arrow = "" if i > 0 else " "
print(f"{arrow}{step['id']}: {step['title']}")
if step['topics']:
print(f" topics: {', '.join(step['topics'])}")
def cmd_consolidate(args):
archive = MnemosyneArchive()
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
@@ -258,144 +239,6 @@ def cmd_neighbors(args):
print()
def cmd_touch(args):
archive = MnemosyneArchive()
try:
entry = archive.touch(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
v = archive.get_vitality(entry.id)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Vitality: {v['vitality']:.4f} (boosted)")
def cmd_decay(args):
archive = MnemosyneArchive()
result = archive.apply_decay()
print(f"Applied decay to {result['total_entries']} entries")
print(f" Decayed: {result['decayed_count']}")
print(f" Avg vitality: {result['avg_vitality']:.4f}")
print(f" Fading (<0.3): {result['fading_count']}")
print(f" Vibrant (>0.7): {result['vibrant_count']}")
def cmd_vitality(args):
archive = MnemosyneArchive()
try:
v = archive.get_vitality(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f}")
print(f" Last accessed: {v['last_accessed'] or 'never'}")
print(f" Age: {v['age_days']} days")
def cmd_fading(args):
archive = MnemosyneArchive()
results = archive.fading(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def cmd_snapshot(args):
archive = MnemosyneArchive()
if args.snapshot_cmd == "create":
result = archive.snapshot_create(label=args.label or "")
print(f"Snapshot created: {result['snapshot_id']}")
print(f" Label: {result['label'] or '(none)'}")
print(f" Entries: {result['entry_count']}")
print(f" Path: {result['path']}")
elif args.snapshot_cmd == "list":
snapshots = archive.snapshot_list()
if not snapshots:
print("No snapshots found.")
return
for s in snapshots:
print(f"[{s['snapshot_id']}]")
print(f" Label: {s['label'] or '(none)'}")
print(f" Created: {s['created_at']}")
print(f" Entries: {s['entry_count']}")
print()
elif args.snapshot_cmd == "restore":
try:
result = archive.snapshot_restore(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Restored from snapshot: {result['snapshot_id']}")
print(f" Entries restored: {result['restored_count']}")
print(f" Previous count: {result['previous_count']}")
elif args.snapshot_cmd == "diff":
try:
diff = archive.snapshot_diff(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Diff vs snapshot: {diff['snapshot_id']}")
print(f" Added ({len(diff['added'])}): ", end="")
if diff["added"]:
print()
for e in diff["added"]:
print(f" + [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Removed ({len(diff['removed'])}): ", end="")
if diff["removed"]:
print()
for e in diff["removed"]:
print(f" - [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Modified({len(diff['modified'])}): ", end="")
if diff["modified"]:
print()
for e in diff["modified"]:
print(f" ~ [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Unchanged: {diff['unchanged']}")
else:
print(f"Unknown snapshot subcommand: {args.snapshot_cmd}")
sys.exit(1)
def cmd_discover(args):
archive = MnemosyneArchive()
results = archive.discover(
count=args.count,
prefer_fading=not args.vibrant,
topic=args.topic if args.topic else None,
)
if not results:
print("No entries found." + (" (topic filter too narrow?)" if args.topic else ""))
return
for r in results:
print(f"[{r['entry_id'][:8]}] {r['title']}")
print(f" Topics: {', '.join(r['topics'])} | Vitality: {r['vitality']} | Age: {r['age_days']}d")
print(f" {r['content_preview']}")
print()
def cmd_vibrant(args):
archive = MnemosyneArchive()
results = archive.vibrant(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
@@ -457,53 +300,14 @@ def main():
nb.add_argument("entry_id", help="Anchor entry ID")
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
pa = sub.add_parser("path", help="Find shortest path between two memories")
pa.add_argument("start", help="Starting entry ID")
pa.add_argument("end", help="Target entry ID")
pa.add_argument("--archive", default=None, help="Archive path")
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
tc = sub.add_parser("touch", help="Boost an entry's vitality by accessing it")
tc.add_argument("entry_id", help="Entry ID to touch")
dc = sub.add_parser("decay", help="Apply time-based decay to all entries")
vy = sub.add_parser("vitality", help="Show an entry's vitality status")
vy.add_argument("entry_id", help="Entry ID to check")
fg = sub.add_parser("fading", help="Show most neglected entries (lowest vitality)")
fg.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
dc = sub.add_parser("discover", help="Serendipitous entry discovery")
dc.add_argument("-n", "--count", type=int, default=5, help="Number of entries to discover")
dc.add_argument("-t", "--topic", default=None, help="Filter by topic")
dc.add_argument("--vibrant", action="store_true", help="Prefer vibrant (alive) entries over fading ones")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
sn_create.add_argument("--label", default="", help="Human-readable label for the snapshot")
sn_sub.add_parser("list", help="List available snapshots")
sn_restore = sn_sub.add_parser("restore", help="Restore archive from a snapshot")
sn_restore.add_argument("snapshot_id", help="Snapshot ID to restore")
sn_diff = sn_sub.add_parser("diff", help="Show what changed since a snapshot")
sn_diff.add_argument("snapshot_id", help="Snapshot ID to compare against")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "snapshot" and not args.snapshot_cmd:
sn.print_help()
sys.exit(1)
dispatch = {
"stats": cmd_stats,
@@ -523,14 +327,6 @@ def main():
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
"consolidate": cmd_consolidate,
"path": cmd_path,
"touch": cmd_touch,
"decay": cmd_decay,
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"snapshot": cmd_snapshot,
"discover": cmd_discover,
}
dispatch[args.command](args)

View File

@@ -1,138 +0,0 @@
"""Tests for Mnemosyne CLI commands — path, touch, decay, vitality, fading, vibrant."""
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import sys
import io
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path)
@pytest.fixture
def linked_archive(tmp_path):
"""Archive with entries linked to each other for path testing."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path, auto_embed=False)
e1 = arch.add(ArchiveEntry(title="Alpha", content="first entry about python", topics=["code"]))
e2 = arch.add(ArchiveEntry(title="Beta", content="second entry about python coding", topics=["code"]))
e3 = arch.add(ArchiveEntry(title="Gamma", content="third entry about cooking recipes", topics=["food"]))
return arch, e1, e2, e3
class TestPathCommand:
def test_shortest_path_exists(self, linked_archive):
arch, e1, e2, e3 = linked_archive
path = arch.shortest_path(e1.id, e2.id)
assert path is not None
assert path[0] == e1.id
assert path[-1] == e2.id
def test_shortest_path_no_connection(self, linked_archive):
arch, e1, e2, e3 = linked_archive
# e3 (cooking) likely not linked to e1 (python coding)
path = arch.shortest_path(e1.id, e3.id)
# Path may or may not exist depending on linking threshold
# Either None or a list is valid
def test_shortest_path_same_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, e1.id)
assert path == [e1.id]
def test_shortest_path_missing_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, "nonexistent-id")
assert path is None
class TestTouchCommand:
def test_touch_boosts_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
# Simulate time passing by setting old last_accessed
old_time = "2020-01-01T00:00:00+00:00"
entry.last_accessed = old_time
entry.vitality = 0.5
archive._save()
touched = archive.touch(entry.id)
assert touched.vitality > 0.5
assert touched.last_accessed != old_time
def test_touch_missing_entry(self, archive):
with pytest.raises(KeyError):
archive.touch("nonexistent-id")
class TestDecayCommand:
def test_apply_decay_returns_stats(self, archive):
archive.add(ArchiveEntry(title="Test", content="Content"))
result = archive.apply_decay()
assert result["total_entries"] == 1
assert "avg_vitality" in result
assert "fading_count" in result
assert "vibrant_count" in result
def test_decay_on_empty_archive(self, archive):
result = archive.apply_decay()
assert result["total_entries"] == 0
assert result["avg_vitality"] == 0.0
class TestVitalityCommand:
def test_get_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
v = archive.get_vitality(entry.id)
assert v["entry_id"] == entry.id
assert v["title"] == "Test"
assert 0.0 <= v["vitality"] <= 1.0
assert v["age_days"] >= 0
def test_get_vitality_missing(self, archive):
with pytest.raises(KeyError):
archive.get_vitality("nonexistent-id")
class TestFadingVibrant:
def test_fading_returns_sorted_ascending(self, archive):
# Add entries with different vitalities
e1 = archive.add(ArchiveEntry(title="Vibrant", content="High energy"))
e2 = archive.add(ArchiveEntry(title="Fading", content="Low energy"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.fading(limit=10)
assert len(results) == 2
assert results[0]["vitality"] <= results[1]["vitality"]
def test_vibrant_returns_sorted_descending(self, archive):
e1 = archive.add(ArchiveEntry(title="Fresh", content="New"))
e2 = archive.add(ArchiveEntry(title="Old", content="Ancient"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.vibrant(limit=10)
assert len(results) == 2
assert results[0]["vitality"] >= results[1]["vitality"]
def test_fading_limit(self, archive):
for i in range(15):
archive.add(ArchiveEntry(title=f"Entry {i}", content=f"Content {i}"))
results = archive.fading(limit=5)
assert len(results) == 5
def test_vibrant_empty(self, archive):
results = archive.vibrant()
assert results == []

View File

@@ -1,85 +0,0 @@
"""Tests for Mnemosyne discover functionality."""
import tempfile
from pathlib import Path
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive_with_entries():
"""Helper: create an archive with test entries."""
path = Path(tempfile.mkdtemp()) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(archive, title="Python automation", content="Building tools in Python", topics=["python", "automation"])
ingest_event(archive, title="Cooking pasta", content="How to make carbonara", topics=["cooking"])
ingest_event(archive, title="Bitcoin basics", content="Understanding Bitcoin and blockchain", topics=["bitcoin", "crypto"])
ingest_event(archive, title="AI agents", content="Building autonomous AI agents", topics=["ai", "agents"])
ingest_event(archive, title="Meditation guide", content="Mindfulness and meditation techniques", topics=["wellness"])
return archive
def test_discover_returns_entries():
archive = _make_archive_with_entries()
results = archive.discover(count=3)
assert len(results) == 3
for r in results:
assert "entry_id" in r
assert "title" in r
assert "content_preview" in r
assert "topics" in r
assert "vitality" in r
assert "age_days" in r
def test_discover_respects_count():
archive = _make_archive_with_entries()
results = archive.discover(count=2)
assert len(results) == 2
def test_discover_count_exceeds_entries():
archive = _make_archive_with_entries()
results = archive.discover(count=100)
assert len(results) == archive.count
def test_discover_topic_filter():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="python")
assert len(results) == 1
assert results[0]["title"] == "Python automation"
def test_discover_topic_case_insensitive():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="Python")
assert len(results) == 1
def test_discover_empty_topic_returns_nothing():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="nonexistent")
assert len(results) == 0
def test_discover_boosts_vitality():
archive = _make_archive_with_entries()
# Get initial vitality
before = archive.fading(limit=5)
# Discover (which touches entries)
archive.discover(count=3)
# The touched entries should have higher vitality now
after = archive.fading(limit=5)
# At least some entries should have changed vitality
before_vitals = {e["entry_id"]: e["vitality"] for e in before}
after_vitals = {e["entry_id"]: e["vitality"] for e in after}
changed = sum(1 for eid in before_vitals if eid in after_vitals and abs(before_vitals[eid] - after_vitals[eid]) > 0.001)
assert changed >= 1, "Discover should touch and boost vitality of selected entries"
def test_discover_empty_archive():
path = Path(tempfile.mkdtemp()) / "empty.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
results = archive.discover(count=5)
assert len(results) == 0

View File

@@ -1,106 +0,0 @@
"""Tests for MnemosyneArchive.shortest_path and path_explanation."""
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def _make_archive(tmp_path):
archive = MnemosyneArchive(str(tmp_path / "test_archive.json"))
return archive
class TestShortestPath:
def test_direct_connection(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "first entry", topics=["start"])
b = archive.add("Beta", "second entry", topics=["end"])
# Manually link
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = archive.shortest_path(a.id, b.id)
assert path == [a.id, b.id]
def test_multi_hop_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "alpha", topics=["x"])
b = archive.add("B", "beta", topics=["y"])
c = archive.add("C", "gamma", topics=["z"])
# Chain: A -> B -> C
a.links.append(b.id)
b.links.extend([a.id, c.id])
c.links.append(b.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._entries[c.id] = c
archive._save()
path = archive.shortest_path(a.id, c.id)
assert path == [a.id, b.id, c.id]
def test_no_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "isolated", topics=[])
b = archive.add("B", "also isolated", topics=[])
path = archive.shortest_path(a.id, b.id)
assert path is None
def test_same_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "lonely", topics=[])
path = archive.shortest_path(a.id, a.id)
assert path == [a.id]
def test_nonexistent_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "exists", topics=[])
path = archive.shortest_path("fake-id", a.id)
assert path is None
def test_shortest_of_multiple(self, tmp_path):
"""When multiple paths exist, BFS returns shortest."""
archive = _make_archive(tmp_path)
a = archive.add("A", "a", topics=[])
b = archive.add("B", "b", topics=[])
c = archive.add("C", "c", topics=[])
d = archive.add("D", "d", topics=[])
# A -> B -> D (short)
# A -> C -> B -> D (long)
a.links.extend([b.id, c.id])
b.links.extend([a.id, d.id, c.id])
c.links.extend([a.id, b.id])
d.links.append(b.id)
for e in [a, b, c, d]:
archive._entries[e.id] = e
archive._save()
path = archive.shortest_path(a.id, d.id)
assert len(path) == 3 # A -> B -> D, not A -> C -> B -> D
class TestPathExplanation:
def test_returns_step_details(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "the beginning", topics=["origin"])
b = archive.add("Beta", "the middle", topics=["process"])
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = [a.id, b.id]
steps = archive.path_explanation(path)
assert len(steps) == 2
assert steps[0]["title"] == "Alpha"
assert steps[1]["title"] == "Beta"
assert "origin" in steps[0]["topics"]
def test_content_preview_truncation(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "x" * 200, topics=[])
steps = archive.path_explanation([a.id])
assert len(steps[0]["content_preview"]) <= 123 # 120 + "..."

View File

@@ -1,240 +0,0 @@
"""Tests for Mnemosyne snapshot (point-in-time backup/restore) feature."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive(tmp_dir: str) -> MnemosyneArchive:
path = Path(tmp_dir) / "archive.json"
return MnemosyneArchive(archive_path=path, auto_embed=False)
# ─── snapshot_create ─────────────────────────────────────────────────────────
def test_snapshot_create_returns_metadata():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Alpha", content="First entry", topics=["a"])
ingest_event(archive, title="Beta", content="Second entry", topics=["b"])
result = archive.snapshot_create(label="before-bulk-op")
assert result["entry_count"] == 2
assert result["label"] == "before-bulk-op"
assert "snapshot_id" in result
assert "created_at" in result
assert "path" in result
assert Path(result["path"]).exists()
def test_snapshot_create_no_label():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Gamma", content="Third entry", topics=[])
result = archive.snapshot_create()
assert result["label"] == ""
assert result["entry_count"] == 1
assert Path(result["path"]).exists()
def test_snapshot_file_contains_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Delta", content="Fourth entry", topics=["d"])
result = archive.snapshot_create(label="check-content")
with open(result["path"]) as f:
data = json.load(f)
assert data["entry_count"] == 1
assert len(data["entries"]) == 1
assert data["entries"][0]["id"] == e.id
assert data["entries"][0]["title"] == "Delta"
def test_snapshot_create_empty_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
result = archive.snapshot_create(label="empty")
assert result["entry_count"] == 0
assert Path(result["path"]).exists()
# ─── snapshot_list ───────────────────────────────────────────────────────────
def test_snapshot_list_empty():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
assert archive.snapshot_list() == []
def test_snapshot_list_returns_all():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="One", content="c1", topics=[])
archive.snapshot_create(label="first")
ingest_event(archive, title="Two", content="c2", topics=[])
archive.snapshot_create(label="second")
snapshots = archive.snapshot_list()
assert len(snapshots) == 2
labels = {s["label"] for s in snapshots}
assert "first" in labels
assert "second" in labels
def test_snapshot_list_metadata_fields():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="meta-check")
snapshots = archive.snapshot_list()
s = snapshots[0]
for key in ("snapshot_id", "label", "created_at", "entry_count", "path"):
assert key in s
def test_snapshot_list_newest_first():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="a")
archive.snapshot_create(label="b")
snapshots = archive.snapshot_list()
# Filenames sort lexicographically; newest (b) should be first
# (filenames include timestamp so alphabetical = newest-last;
# snapshot_list reverses the glob order → newest first)
assert len(snapshots) == 2
# Both should be present; ordering is newest first
ids = [s["snapshot_id"] for s in snapshots]
assert ids == sorted(ids, reverse=True)
# ─── snapshot_restore ────────────────────────────────────────────────────────
def test_snapshot_restore_replaces_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Kept", content="original content", topics=["orig"])
snap = archive.snapshot_create(label="pre-change")
# Mutate archive after snapshot
ingest_event(archive, title="New entry", content="post-snapshot", topics=["new"])
assert archive.count == 2
result = archive.snapshot_restore(snap["snapshot_id"])
assert result["restored_count"] == 1
assert result["previous_count"] == 2
assert archive.count == 1
entry = list(archive._entries.values())[0]
assert entry.title == "Kept"
def test_snapshot_restore_persists_to_disk():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = _make_archive(tmp)
ingest_event(archive, title="Persisted", content="should survive reload", topics=[])
snap = archive.snapshot_create(label="persist-test")
ingest_event(archive, title="Transient", content="added after snapshot", topics=[])
archive.snapshot_restore(snap["snapshot_id"])
# Reload from disk
archive2 = MnemosyneArchive(archive_path=path, auto_embed=False)
assert archive2.count == 1
assert list(archive2._entries.values())[0].title == "Persisted"
def test_snapshot_restore_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_restore("nonexistent_snapshot_id")
# ─── snapshot_diff ───────────────────────────────────────────────────────────
def test_snapshot_diff_no_changes():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Stable", content="unchanged content", topics=[])
snap = archive.snapshot_create(label="baseline")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["added"] == []
assert diff["removed"] == []
assert diff["modified"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_added():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Original", content="existing", topics=[])
snap = archive.snapshot_create(label="before-add")
ingest_event(archive, title="Newcomer", content="added after", topics=[])
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["added"]) == 1
assert diff["added"][0]["title"] == "Newcomer"
assert diff["removed"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_removed():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e1 = ingest_event(archive, title="Will Be Removed", content="doomed", topics=[])
ingest_event(archive, title="Survivor", content="stays", topics=[])
snap = archive.snapshot_create(label="pre-removal")
archive.remove(e1.id)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["removed"]) == 1
assert diff["removed"][0]["title"] == "Will Be Removed"
assert diff["added"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_modified():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Mutable", content="original content", topics=[])
snap = archive.snapshot_create(label="pre-edit")
archive.update_entry(e.id, content="updated content", auto_link=False)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["modified"]) == 1
assert diff["modified"][0]["title"] == "Mutable"
assert diff["modified"][0]["snapshot_hash"] != diff["modified"][0]["current_hash"]
assert diff["added"] == []
assert diff["removed"] == []
def test_snapshot_diff_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_diff("no_such_snapshot")
def test_snapshot_diff_includes_snapshot_id():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
snap = archive.snapshot_create(label="id-check")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["snapshot_id"] == snap["snapshot_id"]