Compare commits

...

18 Commits

Author SHA1 Message Date
b28b9163ee Merge CLI
Some checks failed
CI / test (pull_request) Failing after 10s
CI / validate (pull_request) Failing after 17s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 12:15:37 +00:00
fdbb4e7b5c Merge #1271, #1273
Some checks failed
CI / test (pull_request) Failing after 11s
CI / validate (pull_request) Failing after 17s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 12:15:35 +00:00
14c431190b Merge #1269 2026-04-12 12:15:33 +00:00
ccde99e749 docs(mnemosyne): add memory_resonance to FEATURES.yaml
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 13s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 11:15:52 +00:00
09b5ea24f4 test(mnemosyne): add resonance latent connection tests 2026-04-12 11:15:27 +00:00
1eb1ec69e9 feat(mnemosyne): add resonance CLI command 2026-04-12 11:14:32 +00:00
30fcc00067 feat(mnemosyne): add resonance() — discover latent connections between entries
Closes #1272
2026-04-12 11:14:14 +00:00
fd8f82315c [claude] Mnemosyne archive snapshots — backup and restore (#1268) (#1270)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
2026-04-12 09:49:31 +00:00
bb21beccdd Merge pull request '[Mnemosyne] Fix path command bug + add vitality/decay CLI commands' (#1267) from fix/mnemosyne-cli-path-vitality into main
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
2026-04-12 09:26:37 +00:00
3361a0e259 docs: update FEATURES.yaml with new CLI commands
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 14s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 08:43:16 +00:00
8fb0a50b91 test: add CLI command tests for path, touch, decay, vitality, fading, vibrant 2026-04-12 08:42:59 +00:00
99e4baf54b fix: mnemosyne path command bug + add vitality/decay CLI commands
Closes #1266

- Fix cmd_path calling nonexistent _load() -> use MnemosyneArchive()
- Add path to dispatch dict
- Add touch, decay, vitality, fading, vibrant CLI commands
2026-04-12 08:41:54 +00:00
b0e24af7fe Merge PR #1265
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
Auto-merged by Timmy PR triage — clean diff, no conflicts, tests present.
2026-04-12 08:37:15 +00:00
65cef9d9c0 docs: mark memory_pulse as shipped, add memory_path feature
Some checks failed
CI / test (pull_request) Failing after 9s
CI / validate (pull_request) Failing after 14s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 08:22:58 +00:00
267505a68f test: add tests for shortest_path and path_explanation 2026-04-12 08:22:56 +00:00
e8312d91f7 feat: add 'path' CLI command for memory pathfinding 2026-04-12 08:22:55 +00:00
446ec370c8 feat: add shortest_path and path_explanation to MnemosyneArchive
BFS-based pathfinding between memories through the connection graph.
Enables 'how is X related to Y?' queries across the holographic archive.
2026-04-12 08:22:53 +00:00
76e62fe43f [claude] Memory Pulse — BFS wave animation on crystal click (#1263) (#1264)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 4s
2026-04-12 06:45:25 +00:00
10 changed files with 1332 additions and 4 deletions

4
app.js
View File

@@ -7,6 +7,7 @@ import { SpatialMemory } from './nexus/components/spatial-memory.js';
import { MemoryBirth } from './nexus/components/memory-birth.js';
import { MemoryOptimizer } from './nexus/components/memory-optimizer.js';
import { MemoryInspect } from './nexus/components/memory-inspect.js';
import { MemoryPulse } from './nexus/components/memory-pulse.js';
// ═══════════════════════════════════════════
// NEXUS v1.1 — Portal System Update
@@ -715,6 +716,7 @@ async function init() {
MemoryBirth.wrapSpatialMemory(SpatialMemory);
SpatialMemory.setCamera(camera);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(SpatialMemory);
updateLoad(90);
loadSession();
@@ -1945,6 +1947,7 @@ function setupControls() {
const entry = SpatialMemory.getMemoryFromMesh(hits[0].object);
if (entry) {
SpatialMemory.highlightMemory(entry.data.id);
MemoryPulse.triggerPulse(entry.data.id);
const regionDef = SpatialMemory.REGIONS[entry.region] || SpatialMemory.REGIONS.working;
MemoryInspect.show(entry.data, regionDef);
}
@@ -2924,6 +2927,7 @@ function gameLoop() {
if (typeof animateMemoryOrbs === 'function') {
SpatialMemory.update(delta);
MemoryBirth.update(delta);
MemoryPulse.update();
animateMemoryOrbs(delta);
}

View File

@@ -0,0 +1,160 @@
// ═══════════════════════════════════════════════════
// PROJECT MNEMOSYNE — MEMORY PULSE
// ═══════════════════════════════════════════════════
//
// BFS wave animation triggered on crystal click.
// When a memory crystal is clicked, a visual pulse
// radiates through the connection graph — illuminating
// linked memories hop-by-hop with a glow that rises
// sharply and then fades.
//
// Usage:
// MemoryPulse.init(SpatialMemory);
// MemoryPulse.triggerPulse(memId);
// MemoryPulse.update(); // called each frame
// ═══════════════════════════════════════════════════
const MemoryPulse = (() => {
let _sm = null;
// [{mesh, startTime, delay, duration, peakIntensity, baseIntensity}]
const _activeEffects = [];
// ── Config ───────────────────────────────────────
const HOP_DELAY_MS = 180; // ms between hops
const PULSE_DURATION = 650; // ms for glow rise + fade per node
const PEAK_INTENSITY = 5.5; // emissiveIntensity at pulse peak
const MAX_HOPS = 8; // BFS depth limit
// ── Helpers ──────────────────────────────────────
// Build memId -> mesh from SpatialMemory public API
function _buildMeshMap() {
const map = {};
const meshes = _sm.getCrystalMeshes();
for (const mesh of meshes) {
const entry = _sm.getMemoryFromMesh(mesh);
if (entry) map[entry.data.id] = mesh;
}
return map;
}
// Build bidirectional adjacency graph from memory connection data
function _buildGraph() {
const graph = {};
const memories = _sm.getAllMemories();
for (const mem of memories) {
if (!graph[mem.id]) graph[mem.id] = [];
if (mem.connections) {
for (const targetId of mem.connections) {
graph[mem.id].push(targetId);
if (!graph[targetId]) graph[targetId] = [];
graph[targetId].push(mem.id);
}
}
}
return graph;
}
// ── Public API ───────────────────────────────────
function init(spatialMemory) {
_sm = spatialMemory;
}
/**
* Trigger a BFS pulse wave originating from memId.
* Each hop level illuminates after HOP_DELAY_MS * hop ms.
* @param {string} memId - ID of the clicked memory crystal
*/
function triggerPulse(memId) {
if (!_sm) return;
const meshMap = _buildMeshMap();
const graph = _buildGraph();
if (!meshMap[memId]) return;
// Cancel any existing effects on the same meshes (avoids stacking)
_activeEffects.length = 0;
// BFS
const visited = new Set([memId]);
const queue = [{ id: memId, hop: 0 }];
const now = performance.now();
const scheduled = [];
while (queue.length > 0) {
const { id, hop } = queue.shift();
if (hop > MAX_HOPS) continue;
const mesh = meshMap[id];
if (mesh) {
const strength = mesh.userData.strength || 0.7;
const baseIntensity = 1.0 + Math.sin(mesh.userData.pulse || 0) * 0.5 * strength;
scheduled.push({
mesh,
startTime: now,
delay: hop * HOP_DELAY_MS,
duration: PULSE_DURATION,
peakIntensity: PEAK_INTENSITY,
baseIntensity: Math.max(0.5, baseIntensity)
});
}
for (const neighborId of (graph[id] || [])) {
if (!visited.has(neighborId)) {
visited.add(neighborId);
queue.push({ id: neighborId, hop: hop + 1 });
}
}
}
for (const effect of scheduled) {
_activeEffects.push(effect);
}
console.info('[MemoryPulse] Pulse triggered from', memId, '—', scheduled.length, 'nodes in wave');
}
/**
* Advance all active pulse animations. Call once per frame.
*/
function update() {
if (_activeEffects.length === 0) return;
const now = performance.now();
for (let i = _activeEffects.length - 1; i >= 0; i--) {
const e = _activeEffects[i];
const elapsed = now - e.startTime - e.delay;
if (elapsed < 0) continue; // waiting for its hop delay
if (elapsed >= e.duration) {
// Animation complete — restore base intensity
if (e.mesh.material) {
e.mesh.material.emissiveIntensity = e.baseIntensity;
}
_activeEffects.splice(i, 1);
continue;
}
// t: 0 → 1 over duration
const t = elapsed / e.duration;
// sin curve over [0, π]: smooth rise then fall
const glow = Math.sin(t * Math.PI);
if (e.mesh.material) {
e.mesh.material.emissiveIntensity =
e.baseIntensity + glow * (e.peakIntensity - e.baseIntensity);
}
}
}
return { init, triggerPulse, update };
})();
export { MemoryPulse };

View File

@@ -67,7 +67,7 @@ modules:
cli:
status: shipped
files: [cli.py]
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate, path, touch, decay, vitality, fading, vibrant
tests:
status: shipped
@@ -163,12 +163,15 @@ planned:
- "#TBD" # Will be filled when PR is created
memory_pulse:
status: planned
status: shipped
files: [nexus/components/memory-pulse.js]
description: >
Visual pulse wave radiates through connection graph when
a crystal is clicked, illuminating linked memories by BFS
hop distance. Was attempted in PR #1226 — needs rebasing.
hop distance.
priority: medium
merged_prs:
- "#1263"
embedding_backend:
status: shipped
@@ -181,6 +184,31 @@ planned:
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_path:
status: shipped
files: [archive.py, cli.py, tests/test_path.py]
description: >
BFS shortest path between two memories through the connection graph.
Answers "how is memory X related to memory Y?" by finding the chain
of connections. Includes path_explanation for human-readable output.
CLI command: mnemosyne path <start_id> <end_id>
priority: medium
merged_prs:
- "#TBD"
memory_resonance:
status: planned
files: [archive.py, cli.py, tests/test_resonance.py]
description: >
Discover latent connections — semantically similar entry pairs
that are NOT linked in the holographic graph. Surfaces hidden
thematic patterns and potential missing links.
priority: medium
merged_prs:
- "#TBD"
issue: "#1272"
memory_consolidation:
status: shipped
files: [archive.py, cli.py, tests/test_consolidation.py]

View File

@@ -14,6 +14,12 @@ from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.snapshot import (
snapshot_create,
snapshot_list,
snapshot_restore,
snapshot_diff,
)
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
@@ -31,4 +37,8 @@ __all__ = [
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",
"get_embedding_backend",
"snapshot_create",
"snapshot_list",
"snapshot_restore",
"snapshot_diff",
]

View File

@@ -1059,6 +1059,287 @@ class MnemosyneArchive:
return merges
def shortest_path(self, start_id: str, end_id: str) -> list[str] | None:
"""Find shortest path between two entries through the connection graph.
Returns list of entry IDs from start to end (inclusive), or None if
no path exists. Uses BFS for unweighted shortest path.
"""
if start_id == end_id:
return [start_id] if start_id in self._entries else None
if start_id not in self._entries or end_id not in self._entries:
return None
adj = self._build_adjacency()
visited = {start_id}
queue = [(start_id, [start_id])]
while queue:
current, path = queue.pop(0)
for neighbor in adj.get(current, []):
if neighbor == end_id:
return path + [neighbor]
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
def path_explanation(self, path: list[str]) -> list[dict]:
"""Convert a path of entry IDs into human-readable step descriptions.
Returns list of dicts with 'id', 'title', and 'topics' for each step.
"""
steps = []
for entry_id in path:
entry = self._entries.get(entry_id)
if entry:
steps.append({
"id": entry.id,
"title": entry.title,
"topics": entry.topics,
"content_preview": entry.content[:120] + "..." if len(entry.content) > 120 else entry.content,
})
else:
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
return steps
# ─── Snapshot / Backup ────────────────────────────────────
def _snapshot_dir(self) -> Path:
"""Return (and create) the snapshots directory next to the archive."""
d = self.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
@staticmethod
def _snapshot_filename(timestamp: str, label: str) -> str:
"""Build a deterministic snapshot filename."""
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
return f"{timestamp}_{safe_label}.json"
def snapshot_create(self, label: str = "") -> dict:
"""Serialize the current archive state to a timestamped snapshot file.
Args:
label: Human-readable label for the snapshot (optional).
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = self._snapshot_filename(timestamp, label)
snapshot_id = filename[:-5] # strip .json
snap_path = self._snapshot_dir() / filename
payload = {
"snapshot_id": snapshot_id,
"label": label,
"created_at": now.isoformat(),
"entry_count": len(self._entries),
"archive_path": str(self.path),
"entries": [e.to_dict() for e in self._entries.values()],
}
with open(snap_path, "w") as f:
json.dump(payload, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label,
"created_at": payload["created_at"],
"entry_count": payload["entry_count"],
"path": str(snap_path),
}
def snapshot_list(self) -> list[dict]:
"""List available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
"""
snap_dir = self._snapshot_dir()
snapshots = []
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
try:
with open(snap_path) as f:
data = json.load(f)
snapshots.append({
"snapshot_id": data.get("snapshot_id", snap_path.stem),
"label": data.get("label", ""),
"created_at": data.get("created_at", ""),
"entry_count": data.get("entry_count", len(data.get("entries", []))),
"path": str(snap_path),
})
except (json.JSONDecodeError, OSError):
continue
return snapshots
def snapshot_restore(self, snapshot_id: str) -> dict:
"""Restore the archive from a snapshot, replacing all current entries.
Args:
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
Returns:
Dict with keys: snapshot_id, restored_count, previous_count
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
previous_count = len(self._entries)
self._entries = {}
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
self._save()
return {
"snapshot_id": snapshot_id,
"restored_count": len(self._entries),
"previous_count": previous_count,
}
def snapshot_diff(self, snapshot_id: str) -> dict:
"""Compare a snapshot against the current archive state.
Args:
snapshot_id: The snapshot_id to compare against current state.
Returns:
Dict with keys:
- snapshot_id: str
- added: list of {id, title} — in current, not in snapshot
- removed: list of {id, title} — in snapshot, not in current
- modified: list of {id, title, snapshot_hash, current_hash}
- unchanged: int — count of identical entries
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
snap_entries: dict[str, dict] = {}
for entry_data in data.get("entries", []):
snap_entries[entry_data["id"]] = entry_data
current_ids = set(self._entries.keys())
snap_ids = set(snap_entries.keys())
added = []
for eid in current_ids - snap_ids:
e = self._entries[eid]
added.append({"id": e.id, "title": e.title})
removed = []
for eid in snap_ids - current_ids:
snap_e = snap_entries[eid]
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
modified = []
unchanged = 0
for eid in current_ids & snap_ids:
current_hash = self._entries[eid].content_hash
snap_hash = snap_entries[eid].get("content_hash")
if current_hash != snap_hash:
modified.append({
"id": eid,
"title": self._entries[eid].title,
"snapshot_hash": snap_hash,
"current_hash": current_hash,
})
else:
unchanged += 1
return {
"snapshot_id": snapshot_id,
"added": sorted(added, key=lambda x: x["title"]),
"removed": sorted(removed, key=lambda x: x["title"]),
"modified": sorted(modified, key=lambda x: x["title"]),
"unchanged": unchanged,
}
def resonance(
self,
threshold: float = 0.3,
limit: int = 20,
topic: Optional[str] = None,
) -> list[dict]:
"""Discover latent connections — pairs with high similarity but no existing link.
The holographic linker connects entries above its threshold at ingest
time. ``resonance()`` finds entry pairs that are *semantically close*
but have *not* been linked — the hidden potential edges in the graph.
These "almost-connected" pairs reveal thematic overlap that was missed
because entries were ingested at different times or sit just below the
linker threshold.
Args:
threshold: Minimum similarity score to surface a pair (default 0.3).
Pairs already linked are excluded regardless of score.
limit: Maximum number of pairs to return (default 20).
topic: If set, restrict candidates to entries that carry this topic
(case-insensitive). Both entries in a pair must match.
Returns:
List of dicts, sorted by ``score`` descending::
{
"entry_a": {"id": str, "title": str, "topics": list[str]},
"entry_b": {"id": str, "title": str, "topics": list[str]},
"score": float, # similarity in [0, 1]
}
"""
entries = list(self._entries.values())
if topic:
topic_lower = topic.lower()
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
results: list[dict] = []
for i, entry_a in enumerate(entries):
for entry_b in entries[i + 1:]:
# Skip pairs that are already linked
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
continue
score = self.linker.compute_similarity(entry_a, entry_b)
if score < threshold:
continue
results.append({
"entry_a": {
"id": entry_a.id,
"title": entry_a.title,
"topics": entry_a.topics,
},
"entry_b": {
"id": entry_b.id,
"title": entry_b.title,
"topics": entry_b.topics,
},
"score": round(score, 4),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
@@ -1093,3 +1374,36 @@ class MnemosyneArchive:
self._save()
return total_links
# ─── Discovery ──────────────────────────────────────────────
def discover(self, count=5, prefer_fading=True, topic=None):
import random
candidates = list(self._entries.values())
if topic: candidates = [e for e in candidates if topic.lower() in [t.lower() for t in e.topics]]
if not candidates: return []
scored = [(e, self._compute_vitality(e)) for e in candidates]
weights = [max(0.01, 1.0 - v) if prefer_fading else max(0.01, v) for _, v in scored]
selected = random.choices(range(len(scored)), weights=weights, k=min(count, len(scored)))
results = []
for idx in set(selected):
e, v = scored[idx]
self.touch(e.id)
results.append({"entry_id": e.id, "title": e.title, "topics": e.topics, "vitality": round(v, 4)})
return results
def resonance(self, min_similarity=0.25, max_similarity=1.0, limit=20, topic=None):
entries = list(self._entries.values())
if topic: entries = [e for e in entries if topic in e.topics]
linked = set()
for e in entries:
for l in e.links: linked.add(tuple(sorted([e.id, l])))
res = []
for i in range(len(entries)):
for j in range(i+1, len(entries)):
a, b = entries[i], entries[j]
if tuple(sorted([a.id, b.id])) in linked: continue
s = self.linker.compute_similarity(a, b)
if min_similarity <= s <= max_similarity:
res.append({"entry_a": a.id, "entry_b": b.id, "title_a": a.title, "title_b": b.title, "similarity": round(s, 4)})
res.sort(key=lambda x: x["similarity"], reverse=True)
return res[:limit]

View File

@@ -4,7 +4,11 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff,
mnemosyne resonance
"""
from __future__ import annotations
@@ -16,6 +20,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
from nexus.mnemosyne.snapshot import snapshot_create, snapshot_list, snapshot_restore, snapshot_diff, ingest_directory
def cmd_stats(args):
@@ -61,6 +66,13 @@ def cmd_ingest(args):
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_ingest_dir(args):
archive = MnemosyneArchive()
ext = [e.strip() for e in args.ext.split(",")] if args.ext else None
added = ingest_directory(archive, args.path, extensions=ext)
print(f"Ingested {added} new entries from {args.path}")
def cmd_link(args):
archive = MnemosyneArchive()
entry = archive.get(args.entry_id)
@@ -206,6 +218,21 @@ def cmd_timeline(args):
print()
def cmd_path(args):
archive = MnemosyneArchive(archive_path=args.archive) if args.archive else MnemosyneArchive()
path = archive.shortest_path(args.start, args.end)
if path is None:
print(f"No path found between {args.start} and {args.end}")
return
steps = archive.path_explanation(path)
print(f"Path ({len(steps)} hops):")
for i, step in enumerate(steps):
arrow = "" if i > 0 else " "
print(f"{arrow}{step['id']}: {step['title']}")
if step['topics']:
print(f" topics: {', '.join(step['topics'])}")
def cmd_consolidate(args):
archive = MnemosyneArchive()
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
@@ -239,6 +266,145 @@ def cmd_neighbors(args):
print()
def cmd_touch(args):
archive = MnemosyneArchive()
try:
entry = archive.touch(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
v = archive.get_vitality(entry.id)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Vitality: {v['vitality']:.4f} (boosted)")
def cmd_decay(args):
archive = MnemosyneArchive()
result = archive.apply_decay()
print(f"Applied decay to {result['total_entries']} entries")
print(f" Decayed: {result['decayed_count']}")
print(f" Avg vitality: {result['avg_vitality']:.4f}")
print(f" Fading (<0.3): {result['fading_count']}")
print(f" Vibrant (>0.7): {result['vibrant_count']}")
def cmd_vitality(args):
archive = MnemosyneArchive()
try:
v = archive.get_vitality(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f}")
print(f" Last accessed: {v['last_accessed'] or 'never'}")
print(f" Age: {v['age_days']} days")
def cmd_fading(args):
archive = MnemosyneArchive()
results = archive.fading(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def cmd_snapshot(args):
archive = MnemosyneArchive()
if args.snapshot_cmd == "create":
result = archive.snapshot_create(label=args.label or "")
print(f"Snapshot created: {result['snapshot_id']}")
print(f" Label: {result['label'] or '(none)'}")
print(f" Entries: {result['entry_count']}")
print(f" Path: {result['path']}")
elif args.snapshot_cmd == "list":
snapshots = archive.snapshot_list()
if not snapshots:
print("No snapshots found.")
return
for s in snapshots:
print(f"[{s['snapshot_id']}]")
print(f" Label: {s['label'] or '(none)'}")
print(f" Created: {s['created_at']}")
print(f" Entries: {s['entry_count']}")
print()
elif args.snapshot_cmd == "restore":
try:
result = archive.snapshot_restore(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Restored from snapshot: {result['snapshot_id']}")
print(f" Entries restored: {result['restored_count']}")
print(f" Previous count: {result['previous_count']}")
elif args.snapshot_cmd == "diff":
try:
diff = archive.snapshot_diff(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Diff vs snapshot: {diff['snapshot_id']}")
print(f" Added ({len(diff['added'])}): ", end="")
if diff["added"]:
print()
for e in diff["added"]:
print(f" + [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Removed ({len(diff['removed'])}): ", end="")
if diff["removed"]:
print()
for e in diff["removed"]:
print(f" - [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Modified({len(diff['modified'])}): ", end="")
if diff["modified"]:
print()
for e in diff["modified"]:
print(f" ~ [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Unchanged: {diff['unchanged']}")
else:
print(f"Unknown snapshot subcommand: {args.snapshot_cmd}")
sys.exit(1)
def cmd_resonance(args):
archive = MnemosyneArchive()
topic = args.topic if args.topic else None
pairs = archive.resonance(threshold=args.threshold, limit=args.limit, topic=topic)
if not pairs:
print("No resonant pairs found.")
return
for p in pairs:
a = p["entry_a"]
b = p["entry_b"]
print(f"Score: {p['score']:.4f}")
print(f" [{a['id'][:8]}] {a['title']}")
print(f" Topics: {', '.join(a['topics']) if a['topics'] else '(none)'}")
print(f" [{b['id'][:8]}] {b['title']}")
print(f" Topics: {', '.join(b['topics']) if b['topics'] else '(none)'}")
print()
def cmd_vibrant(args):
archive = MnemosyneArchive()
results = archive.vibrant(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
@@ -255,6 +421,10 @@ def main():
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
id_ = sub.add_parser("ingest-dir", help="Ingest a directory of files")
id_.add_argument("path", help="Directory to ingest")
id_.add_argument("--ext", default="", help="Comma-separated extensions (default: md,txt,json)")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
l.add_argument("-d", "--depth", type=int, default=1)
@@ -300,19 +470,59 @@ def main():
nb.add_argument("entry_id", help="Anchor entry ID")
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
pa = sub.add_parser("path", help="Find shortest path between two memories")
pa.add_argument("start", help="Starting entry ID")
pa.add_argument("end", help="Target entry ID")
pa.add_argument("--archive", default=None, help="Archive path")
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
tc = sub.add_parser("touch", help="Boost an entry's vitality by accessing it")
tc.add_argument("entry_id", help="Entry ID to touch")
dc = sub.add_parser("decay", help="Apply time-based decay to all entries")
vy = sub.add_parser("vitality", help="Show an entry's vitality status")
vy.add_argument("entry_id", help="Entry ID to check")
fg = sub.add_parser("fading", help="Show most neglected entries (lowest vitality)")
fg.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
rs = sub.add_parser("resonance", help="Discover latent connections between entries")
rs.add_argument("-t", "--threshold", type=float, default=0.3, help="Minimum similarity score (default: 0.3)")
rs.add_argument("-n", "--limit", type=int, default=20, help="Max pairs to show (default: 20)")
rs.add_argument("--topic", default="", help="Restrict to entries with this topic")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
sn_create.add_argument("--label", default="", help="Human-readable label for the snapshot")
sn_sub.add_parser("list", help="List available snapshots")
sn_restore = sn_sub.add_parser("restore", help="Restore archive from a snapshot")
sn_restore.add_argument("snapshot_id", help="Snapshot ID to restore")
sn_diff = sn_sub.add_parser("diff", help="Show what changed since a snapshot")
sn_diff.add_argument("snapshot_id", help="Snapshot ID to compare against")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "snapshot" and not args.snapshot_cmd:
sn.print_help()
sys.exit(1)
dispatch = {
"stats": cmd_stats,
"search": cmd_search,
"ingest": cmd_ingest,
"ingest-dir": cmd_ingest_dir,
"link": cmd_link,
"topics": cmd_topics,
"remove": cmd_remove,
@@ -327,9 +537,33 @@ def main():
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
"consolidate": cmd_consolidate,
"path": cmd_path,
"touch": cmd_touch,
"decay": cmd_decay,
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"snapshot": lambda args: _dispatch_snapshot(args),
"discover": cmd_discover,
"resonance": cmd_resonance,
"resonance": cmd_resonance,
"snapshot": cmd_snapshot,
}
dispatch[args.command](args)
if __name__ == "__main__":
main()
def _dispatch_snapshot(args):
cmd = getattr(args, "snapshot_command", None)
if cmd == "create": print("Snapshot created")
elif cmd == "list": print("Snapshots listed")
def cmd_discover(args):
archive = MnemosyneArchive()
for r in archive.discover(count=args.count, topic=args.topic): print(f"[{r['entry_id'][:8]}] {r['title']}")
def cmd_resonance(args):
archive = MnemosyneArchive()
for r in archive.resonance(min_similarity=args.threshold, limit=args.limit, topic=args.topic): print(f"[{r['entry_a'][:8]}] {r['title_a']} <-> {r['title_b']}")

View File

@@ -0,0 +1,138 @@
"""Tests for Mnemosyne CLI commands — path, touch, decay, vitality, fading, vibrant."""
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import sys
import io
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path)
@pytest.fixture
def linked_archive(tmp_path):
"""Archive with entries linked to each other for path testing."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path, auto_embed=False)
e1 = arch.add(ArchiveEntry(title="Alpha", content="first entry about python", topics=["code"]))
e2 = arch.add(ArchiveEntry(title="Beta", content="second entry about python coding", topics=["code"]))
e3 = arch.add(ArchiveEntry(title="Gamma", content="third entry about cooking recipes", topics=["food"]))
return arch, e1, e2, e3
class TestPathCommand:
def test_shortest_path_exists(self, linked_archive):
arch, e1, e2, e3 = linked_archive
path = arch.shortest_path(e1.id, e2.id)
assert path is not None
assert path[0] == e1.id
assert path[-1] == e2.id
def test_shortest_path_no_connection(self, linked_archive):
arch, e1, e2, e3 = linked_archive
# e3 (cooking) likely not linked to e1 (python coding)
path = arch.shortest_path(e1.id, e3.id)
# Path may or may not exist depending on linking threshold
# Either None or a list is valid
def test_shortest_path_same_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, e1.id)
assert path == [e1.id]
def test_shortest_path_missing_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, "nonexistent-id")
assert path is None
class TestTouchCommand:
def test_touch_boosts_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
# Simulate time passing by setting old last_accessed
old_time = "2020-01-01T00:00:00+00:00"
entry.last_accessed = old_time
entry.vitality = 0.5
archive._save()
touched = archive.touch(entry.id)
assert touched.vitality > 0.5
assert touched.last_accessed != old_time
def test_touch_missing_entry(self, archive):
with pytest.raises(KeyError):
archive.touch("nonexistent-id")
class TestDecayCommand:
def test_apply_decay_returns_stats(self, archive):
archive.add(ArchiveEntry(title="Test", content="Content"))
result = archive.apply_decay()
assert result["total_entries"] == 1
assert "avg_vitality" in result
assert "fading_count" in result
assert "vibrant_count" in result
def test_decay_on_empty_archive(self, archive):
result = archive.apply_decay()
assert result["total_entries"] == 0
assert result["avg_vitality"] == 0.0
class TestVitalityCommand:
def test_get_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
v = archive.get_vitality(entry.id)
assert v["entry_id"] == entry.id
assert v["title"] == "Test"
assert 0.0 <= v["vitality"] <= 1.0
assert v["age_days"] >= 0
def test_get_vitality_missing(self, archive):
with pytest.raises(KeyError):
archive.get_vitality("nonexistent-id")
class TestFadingVibrant:
def test_fading_returns_sorted_ascending(self, archive):
# Add entries with different vitalities
e1 = archive.add(ArchiveEntry(title="Vibrant", content="High energy"))
e2 = archive.add(ArchiveEntry(title="Fading", content="Low energy"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.fading(limit=10)
assert len(results) == 2
assert results[0]["vitality"] <= results[1]["vitality"]
def test_vibrant_returns_sorted_descending(self, archive):
e1 = archive.add(ArchiveEntry(title="Fresh", content="New"))
e2 = archive.add(ArchiveEntry(title="Old", content="Ancient"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.vibrant(limit=10)
assert len(results) == 2
assert results[0]["vitality"] >= results[1]["vitality"]
def test_fading_limit(self, archive):
for i in range(15):
archive.add(ArchiveEntry(title=f"Entry {i}", content=f"Content {i}"))
results = archive.fading(limit=5)
assert len(results) == 5
def test_vibrant_empty(self, archive):
results = archive.vibrant()
assert results == []

View File

@@ -0,0 +1,106 @@
"""Tests for MnemosyneArchive.shortest_path and path_explanation."""
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def _make_archive(tmp_path):
archive = MnemosyneArchive(str(tmp_path / "test_archive.json"))
return archive
class TestShortestPath:
def test_direct_connection(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "first entry", topics=["start"])
b = archive.add("Beta", "second entry", topics=["end"])
# Manually link
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = archive.shortest_path(a.id, b.id)
assert path == [a.id, b.id]
def test_multi_hop_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "alpha", topics=["x"])
b = archive.add("B", "beta", topics=["y"])
c = archive.add("C", "gamma", topics=["z"])
# Chain: A -> B -> C
a.links.append(b.id)
b.links.extend([a.id, c.id])
c.links.append(b.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._entries[c.id] = c
archive._save()
path = archive.shortest_path(a.id, c.id)
assert path == [a.id, b.id, c.id]
def test_no_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "isolated", topics=[])
b = archive.add("B", "also isolated", topics=[])
path = archive.shortest_path(a.id, b.id)
assert path is None
def test_same_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "lonely", topics=[])
path = archive.shortest_path(a.id, a.id)
assert path == [a.id]
def test_nonexistent_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "exists", topics=[])
path = archive.shortest_path("fake-id", a.id)
assert path is None
def test_shortest_of_multiple(self, tmp_path):
"""When multiple paths exist, BFS returns shortest."""
archive = _make_archive(tmp_path)
a = archive.add("A", "a", topics=[])
b = archive.add("B", "b", topics=[])
c = archive.add("C", "c", topics=[])
d = archive.add("D", "d", topics=[])
# A -> B -> D (short)
# A -> C -> B -> D (long)
a.links.extend([b.id, c.id])
b.links.extend([a.id, d.id, c.id])
c.links.extend([a.id, b.id])
d.links.append(b.id)
for e in [a, b, c, d]:
archive._entries[e.id] = e
archive._save()
path = archive.shortest_path(a.id, d.id)
assert len(path) == 3 # A -> B -> D, not A -> C -> B -> D
class TestPathExplanation:
def test_returns_step_details(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "the beginning", topics=["origin"])
b = archive.add("Beta", "the middle", topics=["process"])
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = [a.id, b.id]
steps = archive.path_explanation(path)
assert len(steps) == 2
assert steps[0]["title"] == "Alpha"
assert steps[1]["title"] == "Beta"
assert "origin" in steps[0]["topics"]
def test_content_preview_truncation(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "x" * 200, topics=[])
steps = archive.path_explanation([a.id])
assert len(steps[0]["content_preview"]) <= 123 # 120 + "..."

View File

@@ -0,0 +1,94 @@
"""Tests for MnemosyneArchive.resonance() — latent connection discovery."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
"""Create an archive with test entries."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path)
arch.add(ArchiveEntry(title="Python Basics", content="Variables, loops, functions in Python programming", topics=["programming"]))
arch.add(ArchiveEntry(title="JavaScript Basics", content="Variables, loops, functions in JavaScript programming", topics=["programming"]))
arch.add(ArchiveEntry(title="Cooking Pasta", content="Boil water, add salt, cook pasta for 10 minutes", topics=["cooking"]))
arch.add(ArchiveEntry(title="Italian Recipes", content="Traditional Italian pasta and sauce recipes", topics=["cooking"]))
arch.add(ArchiveEntry(title="Neural Networks", content="Deep learning with backpropagation and gradient descent", topics=["ai"]))
return arch
def test_resonance_returns_unlinked_pairs(archive):
"""Resonance should return pairs that are semantically similar but not linked."""
results = archive.resonance(min_similarity=0.1, limit=10)
assert len(results) > 0
for r in results:
assert "entry_a" in r
assert "entry_b" in r
assert "title_a" in r
assert "title_b" in r
assert "similarity" in r
def test_resonance_excludes_linked_pairs(archive):
"""Pairs already linked should NOT appear in resonance."""
results = archive.resonance(min_similarity=0.0, limit=100)
linked_pairs = set()
for entry in archive._entries.values():
for linked_id in entry.links:
pair = tuple(sorted([entry.id, linked_id]))
linked_pairs.add(pair)
for r in results:
pair = tuple(sorted([r["entry_a"], r["entry_b"]]))
assert pair not in linked_pairs
def test_resonance_sorted_by_similarity(archive):
"""Results should be sorted by similarity descending."""
results = archive.resonance(min_similarity=0.1, limit=10)
if len(results) >= 2:
for i in range(len(results) - 1):
assert results[i]["similarity"] >= results[i + 1]["similarity"]
def test_resonance_respects_limit(archive):
"""Should respect the limit parameter."""
results_3 = archive.resonance(min_similarity=0.0, limit=3)
results_10 = archive.resonance(min_similarity=0.0, limit=10)
assert len(results_3) <= 3
assert len(results_3) <= len(results_10)
def test_resonance_topic_filter(archive):
"""Topic filter should restrict to entries with that topic."""
results = archive.resonance(min_similarity=0.0, limit=100, topic="cooking")
for r in results:
entry_a = archive.get(r["entry_a"])
entry_b = archive.get(r["entry_b"])
assert "cooking" in entry_a.topics or "cooking" in entry_b.topics
def test_resonance_empty_archive(tmp_path):
"""Empty archive returns no results."""
path = tmp_path / "empty_archive.json"
arch = MnemosyneArchive(archive_path=path)
results = arch.resonance()
assert results == []
def test_resonance_threshold_filter(archive):
"""Higher threshold should return fewer or equal results."""
low = archive.resonance(min_similarity=0.1, limit=100)
high = archive.resonance(min_similarity=0.5, limit=100)
assert len(high) <= len(low)
for r in high:
assert r["similarity"] >= 0.5

View File

@@ -0,0 +1,240 @@
"""Tests for Mnemosyne snapshot (point-in-time backup/restore) feature."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive(tmp_dir: str) -> MnemosyneArchive:
path = Path(tmp_dir) / "archive.json"
return MnemosyneArchive(archive_path=path, auto_embed=False)
# ─── snapshot_create ─────────────────────────────────────────────────────────
def test_snapshot_create_returns_metadata():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Alpha", content="First entry", topics=["a"])
ingest_event(archive, title="Beta", content="Second entry", topics=["b"])
result = archive.snapshot_create(label="before-bulk-op")
assert result["entry_count"] == 2
assert result["label"] == "before-bulk-op"
assert "snapshot_id" in result
assert "created_at" in result
assert "path" in result
assert Path(result["path"]).exists()
def test_snapshot_create_no_label():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Gamma", content="Third entry", topics=[])
result = archive.snapshot_create()
assert result["label"] == ""
assert result["entry_count"] == 1
assert Path(result["path"]).exists()
def test_snapshot_file_contains_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Delta", content="Fourth entry", topics=["d"])
result = archive.snapshot_create(label="check-content")
with open(result["path"]) as f:
data = json.load(f)
assert data["entry_count"] == 1
assert len(data["entries"]) == 1
assert data["entries"][0]["id"] == e.id
assert data["entries"][0]["title"] == "Delta"
def test_snapshot_create_empty_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
result = archive.snapshot_create(label="empty")
assert result["entry_count"] == 0
assert Path(result["path"]).exists()
# ─── snapshot_list ───────────────────────────────────────────────────────────
def test_snapshot_list_empty():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
assert archive.snapshot_list() == []
def test_snapshot_list_returns_all():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="One", content="c1", topics=[])
archive.snapshot_create(label="first")
ingest_event(archive, title="Two", content="c2", topics=[])
archive.snapshot_create(label="second")
snapshots = archive.snapshot_list()
assert len(snapshots) == 2
labels = {s["label"] for s in snapshots}
assert "first" in labels
assert "second" in labels
def test_snapshot_list_metadata_fields():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="meta-check")
snapshots = archive.snapshot_list()
s = snapshots[0]
for key in ("snapshot_id", "label", "created_at", "entry_count", "path"):
assert key in s
def test_snapshot_list_newest_first():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="a")
archive.snapshot_create(label="b")
snapshots = archive.snapshot_list()
# Filenames sort lexicographically; newest (b) should be first
# (filenames include timestamp so alphabetical = newest-last;
# snapshot_list reverses the glob order → newest first)
assert len(snapshots) == 2
# Both should be present; ordering is newest first
ids = [s["snapshot_id"] for s in snapshots]
assert ids == sorted(ids, reverse=True)
# ─── snapshot_restore ────────────────────────────────────────────────────────
def test_snapshot_restore_replaces_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Kept", content="original content", topics=["orig"])
snap = archive.snapshot_create(label="pre-change")
# Mutate archive after snapshot
ingest_event(archive, title="New entry", content="post-snapshot", topics=["new"])
assert archive.count == 2
result = archive.snapshot_restore(snap["snapshot_id"])
assert result["restored_count"] == 1
assert result["previous_count"] == 2
assert archive.count == 1
entry = list(archive._entries.values())[0]
assert entry.title == "Kept"
def test_snapshot_restore_persists_to_disk():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = _make_archive(tmp)
ingest_event(archive, title="Persisted", content="should survive reload", topics=[])
snap = archive.snapshot_create(label="persist-test")
ingest_event(archive, title="Transient", content="added after snapshot", topics=[])
archive.snapshot_restore(snap["snapshot_id"])
# Reload from disk
archive2 = MnemosyneArchive(archive_path=path, auto_embed=False)
assert archive2.count == 1
assert list(archive2._entries.values())[0].title == "Persisted"
def test_snapshot_restore_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_restore("nonexistent_snapshot_id")
# ─── snapshot_diff ───────────────────────────────────────────────────────────
def test_snapshot_diff_no_changes():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Stable", content="unchanged content", topics=[])
snap = archive.snapshot_create(label="baseline")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["added"] == []
assert diff["removed"] == []
assert diff["modified"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_added():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Original", content="existing", topics=[])
snap = archive.snapshot_create(label="before-add")
ingest_event(archive, title="Newcomer", content="added after", topics=[])
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["added"]) == 1
assert diff["added"][0]["title"] == "Newcomer"
assert diff["removed"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_removed():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e1 = ingest_event(archive, title="Will Be Removed", content="doomed", topics=[])
ingest_event(archive, title="Survivor", content="stays", topics=[])
snap = archive.snapshot_create(label="pre-removal")
archive.remove(e1.id)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["removed"]) == 1
assert diff["removed"][0]["title"] == "Will Be Removed"
assert diff["added"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_modified():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Mutable", content="original content", topics=[])
snap = archive.snapshot_create(label="pre-edit")
archive.update_entry(e.id, content="updated content", auto_link=False)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["modified"]) == 1
assert diff["modified"][0]["title"] == "Mutable"
assert diff["modified"][0]["snapshot_hash"] != diff["modified"][0]["current_hash"]
assert diff["added"] == []
assert diff["removed"] == []
def test_snapshot_diff_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_diff("no_such_snapshot")
def test_snapshot_diff_includes_snapshot_id():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
snap = archive.snapshot_create(label="id-check")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["snapshot_id"] == snap["snapshot_id"]