Compare commits

..

5 Commits

Author SHA1 Message Date
e7754ce101 docs: update __init__.py docstring for consolidation (#1260)
Some checks failed
CI / test (pull_request) Failing after 8s
CI / validate (pull_request) Failing after 12s
Review Approval Gate / verify-review (pull_request) Failing after 2s
2026-04-12 06:21:02 +00:00
2fa8b5d99b docs: mark memory_consolidation as shipped (#1260) 2026-04-12 06:20:40 +00:00
bb856765ce test: add consolidation tests (#1260)
Covers exact duplicate detection, dry-run, topic/link
merging, triple duplicates, and link repair.
2026-04-12 06:20:39 +00:00
1e110922b2 feat: add 'mnemosyne consolidate' CLI command (#1260)
Supports --dry-run and --threshold options for safe
duplicate detection and merging.
2026-04-12 06:20:06 +00:00
b308e627b8 feat: add consolidate() method to MnemosyneArchive (#1260)
Scans for exact and near-duplicate entries by content_hash
and embedding similarity. Merges older entry with union of
topics, links, and metadata from duplicates.
2026-04-12 06:18:48 +00:00
11 changed files with 239 additions and 1532 deletions

4
app.js
View File

@@ -7,7 +7,6 @@ import { SpatialMemory } from './nexus/components/spatial-memory.js';
import { MemoryBirth } from './nexus/components/memory-birth.js';
import { MemoryOptimizer } from './nexus/components/memory-optimizer.js';
import { MemoryInspect } from './nexus/components/memory-inspect.js';
import { MemoryPulse } from './nexus/components/memory-pulse.js';
// ═══════════════════════════════════════════
// NEXUS v1.1 — Portal System Update
@@ -716,7 +715,6 @@ async function init() {
MemoryBirth.wrapSpatialMemory(SpatialMemory);
SpatialMemory.setCamera(camera);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(SpatialMemory);
updateLoad(90);
loadSession();
@@ -1947,7 +1945,6 @@ function setupControls() {
const entry = SpatialMemory.getMemoryFromMesh(hits[0].object);
if (entry) {
SpatialMemory.highlightMemory(entry.data.id);
MemoryPulse.triggerPulse(entry.data.id);
const regionDef = SpatialMemory.REGIONS[entry.region] || SpatialMemory.REGIONS.working;
MemoryInspect.show(entry.data, regionDef);
}
@@ -2927,7 +2924,6 @@ function gameLoop() {
if (typeof animateMemoryOrbs === 'function') {
SpatialMemory.update(delta);
MemoryBirth.update(delta);
MemoryPulse.update();
animateMemoryOrbs(delta);
}

View File

@@ -1,160 +0,0 @@
// ═══════════════════════════════════════════════════
// PROJECT MNEMOSYNE — MEMORY PULSE
// ═══════════════════════════════════════════════════
//
// BFS wave animation triggered on crystal click.
// When a memory crystal is clicked, a visual pulse
// radiates through the connection graph — illuminating
// linked memories hop-by-hop with a glow that rises
// sharply and then fades.
//
// Usage:
// MemoryPulse.init(SpatialMemory);
// MemoryPulse.triggerPulse(memId);
// MemoryPulse.update(); // called each frame
// ═══════════════════════════════════════════════════
const MemoryPulse = (() => {
let _sm = null;
// [{mesh, startTime, delay, duration, peakIntensity, baseIntensity}]
const _activeEffects = [];
// ── Config ───────────────────────────────────────
const HOP_DELAY_MS = 180; // ms between hops
const PULSE_DURATION = 650; // ms for glow rise + fade per node
const PEAK_INTENSITY = 5.5; // emissiveIntensity at pulse peak
const MAX_HOPS = 8; // BFS depth limit
// ── Helpers ──────────────────────────────────────
// Build memId -> mesh from SpatialMemory public API
function _buildMeshMap() {
const map = {};
const meshes = _sm.getCrystalMeshes();
for (const mesh of meshes) {
const entry = _sm.getMemoryFromMesh(mesh);
if (entry) map[entry.data.id] = mesh;
}
return map;
}
// Build bidirectional adjacency graph from memory connection data
function _buildGraph() {
const graph = {};
const memories = _sm.getAllMemories();
for (const mem of memories) {
if (!graph[mem.id]) graph[mem.id] = [];
if (mem.connections) {
for (const targetId of mem.connections) {
graph[mem.id].push(targetId);
if (!graph[targetId]) graph[targetId] = [];
graph[targetId].push(mem.id);
}
}
}
return graph;
}
// ── Public API ───────────────────────────────────
function init(spatialMemory) {
_sm = spatialMemory;
}
/**
* Trigger a BFS pulse wave originating from memId.
* Each hop level illuminates after HOP_DELAY_MS * hop ms.
* @param {string} memId - ID of the clicked memory crystal
*/
function triggerPulse(memId) {
if (!_sm) return;
const meshMap = _buildMeshMap();
const graph = _buildGraph();
if (!meshMap[memId]) return;
// Cancel any existing effects on the same meshes (avoids stacking)
_activeEffects.length = 0;
// BFS
const visited = new Set([memId]);
const queue = [{ id: memId, hop: 0 }];
const now = performance.now();
const scheduled = [];
while (queue.length > 0) {
const { id, hop } = queue.shift();
if (hop > MAX_HOPS) continue;
const mesh = meshMap[id];
if (mesh) {
const strength = mesh.userData.strength || 0.7;
const baseIntensity = 1.0 + Math.sin(mesh.userData.pulse || 0) * 0.5 * strength;
scheduled.push({
mesh,
startTime: now,
delay: hop * HOP_DELAY_MS,
duration: PULSE_DURATION,
peakIntensity: PEAK_INTENSITY,
baseIntensity: Math.max(0.5, baseIntensity)
});
}
for (const neighborId of (graph[id] || [])) {
if (!visited.has(neighborId)) {
visited.add(neighborId);
queue.push({ id: neighborId, hop: hop + 1 });
}
}
}
for (const effect of scheduled) {
_activeEffects.push(effect);
}
console.info('[MemoryPulse] Pulse triggered from', memId, '—', scheduled.length, 'nodes in wave');
}
/**
* Advance all active pulse animations. Call once per frame.
*/
function update() {
if (_activeEffects.length === 0) return;
const now = performance.now();
for (let i = _activeEffects.length - 1; i >= 0; i--) {
const e = _activeEffects[i];
const elapsed = now - e.startTime - e.delay;
if (elapsed < 0) continue; // waiting for its hop delay
if (elapsed >= e.duration) {
// Animation complete — restore base intensity
if (e.mesh.material) {
e.mesh.material.emissiveIntensity = e.baseIntensity;
}
_activeEffects.splice(i, 1);
continue;
}
// t: 0 → 1 over duration
const t = elapsed / e.duration;
// sin curve over [0, π]: smooth rise then fall
const glow = Math.sin(t * Math.PI);
if (e.mesh.material) {
e.mesh.material.emissiveIntensity =
e.baseIntensity + glow * (e.peakIntensity - e.baseIntensity);
}
}
}
return { init, triggerPulse, update };
})();
export { MemoryPulse };

View File

@@ -67,7 +67,7 @@ modules:
cli:
status: shipped
files: [cli.py]
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors, consolidate, path, touch, decay, vitality, fading, vibrant
description: CLI interface — stats, search, ingest, link, topics, remove, export, clusters, hubs, bridges, rebuild, tag/untag/retag, timeline, neighbors
tests:
status: shipped
@@ -163,15 +163,12 @@ planned:
- "#TBD" # Will be filled when PR is created
memory_pulse:
status: shipped
files: [nexus/components/memory-pulse.js]
status: planned
description: >
Visual pulse wave radiates through connection graph when
a crystal is clicked, illuminating linked memories by BFS
hop distance.
hop distance. Was attempted in PR #1226 — needs rebasing.
priority: medium
merged_prs:
- "#1263"
embedding_backend:
status: shipped
@@ -184,26 +181,13 @@ planned:
merged_prs:
- "#TBD" # Will be filled when PR is created
memory_path:
status: shipped
files: [archive.py, cli.py, tests/test_path.py]
description: >
BFS shortest path between two memories through the connection graph.
Answers "how is memory X related to memory Y?" by finding the chain
of connections. Includes path_explanation for human-readable output.
CLI command: mnemosyne path <start_id> <end_id>
priority: medium
merged_prs:
- "#TBD"
memory_consolidation:
status: shipped
files: [archive.py, cli.py, tests/test_consolidation.py]
files: [archive.py, cli.py]
description: >
Automatic merging of duplicate/near-duplicate memories
using content_hash and semantic similarity. Periodic
consolidation pass.
priority: low
merged_prs:
- "#1260"
- "#TBD" # Will be filled when PR is created

View File

@@ -1,7 +1,7 @@
"""nexus.mnemosyne — The Living Holographic Archive.
Phase 1: Foundation — core archive, entry model, holographic linker,
ingestion pipeline, and CLI.
ingestion pipeline, memory consolidation, and CLI.
Builds on MemPalace vector memory to create interconnected meaning:
entries auto-reference related entries via semantic similarity,

View File

@@ -938,342 +938,6 @@ class MnemosyneArchive:
"vibrant_count": vibrant_count,
}
def consolidate(
self,
threshold: float = 0.9,
dry_run: bool = False,
) -> list[dict]:
"""Scan the archive and merge duplicate/near-duplicate entries.
Two entries are considered duplicates if:
- They share the same ``content_hash`` (exact duplicate), or
- Their similarity score (via HolographicLinker) exceeds ``threshold``
(near-duplicate when an embedding backend is available or Jaccard is
high enough at the given threshold).
Merge strategy:
- Keep the *older* entry (earlier ``created_at``).
- Union topics from both entries (case-deduped).
- Merge metadata from newer into older (older values win on conflicts).
- Transfer all links from the newer entry to the older entry.
- Delete the newer entry.
Args:
threshold: Similarity threshold for near-duplicate detection (0.01.0).
Default 0.9 is intentionally conservative.
dry_run: If True, return the list of would-be merges without mutating
the archive.
Returns:
List of dicts, one per merged pair::
{
"kept": <entry_id of survivor>,
"removed": <entry_id of duplicate>,
"reason": "exact_hash" | "semantic_similarity",
"score": float, # 1.0 for exact hash matches
"dry_run": bool,
}
"""
merges: list[dict] = []
entries = list(self._entries.values())
removed_ids: set[str] = set()
for i, entry_a in enumerate(entries):
if entry_a.id in removed_ids:
continue
for entry_b in entries[i + 1:]:
if entry_b.id in removed_ids:
continue
# Determine if they are duplicates
reason: Optional[str] = None
score: float = 0.0
if (
entry_a.content_hash is not None
and entry_b.content_hash is not None
and entry_a.content_hash == entry_b.content_hash
):
reason = "exact_hash"
score = 1.0
else:
sim = self.linker.compute_similarity(entry_a, entry_b)
if sim >= threshold:
reason = "semantic_similarity"
score = sim
if reason is None:
continue
# Decide which entry to keep (older survives)
if entry_a.created_at <= entry_b.created_at:
kept, removed = entry_a, entry_b
else:
kept, removed = entry_b, entry_a
merges.append({
"kept": kept.id,
"removed": removed.id,
"reason": reason,
"score": round(score, 4),
"dry_run": dry_run,
})
if not dry_run:
# Merge topics (case-deduped)
existing_lower = {t.lower() for t in kept.topics}
for tag in removed.topics:
if tag.lower() not in existing_lower:
kept.topics.append(tag)
existing_lower.add(tag.lower())
# Merge metadata (kept wins on key conflicts)
for k, v in removed.metadata.items():
if k not in kept.metadata:
kept.metadata[k] = v
# Transfer links: add removed's links to kept
kept_links_set = set(kept.links)
for lid in removed.links:
if lid != kept.id and lid not in kept_links_set and lid not in removed_ids:
kept.links.append(lid)
kept_links_set.add(lid)
# Update the other entry's back-link
other = self._entries.get(lid)
if other and kept.id not in other.links:
other.links.append(kept.id)
# Remove back-links pointing at the removed entry
for other in self._entries.values():
if removed.id in other.links:
other.links.remove(removed.id)
if other.id != kept.id and kept.id not in other.links:
other.links.append(kept.id)
del self._entries[removed.id]
removed_ids.add(removed.id)
if not dry_run and merges:
self._save()
return merges
def shortest_path(self, start_id: str, end_id: str) -> list[str] | None:
"""Find shortest path between two entries through the connection graph.
Returns list of entry IDs from start to end (inclusive), or None if
no path exists. Uses BFS for unweighted shortest path.
"""
if start_id == end_id:
return [start_id] if start_id in self._entries else None
if start_id not in self._entries or end_id not in self._entries:
return None
adj = self._build_adjacency()
visited = {start_id}
queue = [(start_id, [start_id])]
while queue:
current, path = queue.pop(0)
for neighbor in adj.get(current, []):
if neighbor == end_id:
return path + [neighbor]
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))
return None
def path_explanation(self, path: list[str]) -> list[dict]:
"""Convert a path of entry IDs into human-readable step descriptions.
Returns list of dicts with 'id', 'title', and 'topics' for each step.
"""
steps = []
for entry_id in path:
entry = self._entries.get(entry_id)
if entry:
steps.append({
"id": entry.id,
"title": entry.title,
"topics": entry.topics,
"content_preview": entry.content[:120] + "..." if len(entry.content) > 120 else entry.content,
})
else:
steps.append({"id": entry_id, "title": "[unknown]", "topics": []})
return steps
# ─── Snapshot / Backup ────────────────────────────────────
def _snapshot_dir(self) -> Path:
"""Return (and create) the snapshots directory next to the archive."""
d = self.path.parent / "snapshots"
d.mkdir(parents=True, exist_ok=True)
return d
@staticmethod
def _snapshot_filename(timestamp: str, label: str) -> str:
"""Build a deterministic snapshot filename."""
safe_label = "".join(c if c.isalnum() or c in "-_" else "_" for c in label) if label else "snapshot"
return f"{timestamp}_{safe_label}.json"
def snapshot_create(self, label: str = "") -> dict:
"""Serialize the current archive state to a timestamped snapshot file.
Args:
label: Human-readable label for the snapshot (optional).
Returns:
Dict with keys: snapshot_id, label, created_at, entry_count, path
"""
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = self._snapshot_filename(timestamp, label)
snapshot_id = filename[:-5] # strip .json
snap_path = self._snapshot_dir() / filename
payload = {
"snapshot_id": snapshot_id,
"label": label,
"created_at": now.isoformat(),
"entry_count": len(self._entries),
"archive_path": str(self.path),
"entries": [e.to_dict() for e in self._entries.values()],
}
with open(snap_path, "w") as f:
json.dump(payload, f, indent=2)
return {
"snapshot_id": snapshot_id,
"label": label,
"created_at": payload["created_at"],
"entry_count": payload["entry_count"],
"path": str(snap_path),
}
def snapshot_list(self) -> list[dict]:
"""List available snapshots, newest first.
Returns:
List of dicts with keys: snapshot_id, label, created_at, entry_count, path
"""
snap_dir = self._snapshot_dir()
snapshots = []
for snap_path in sorted(snap_dir.glob("*.json"), reverse=True):
try:
with open(snap_path) as f:
data = json.load(f)
snapshots.append({
"snapshot_id": data.get("snapshot_id", snap_path.stem),
"label": data.get("label", ""),
"created_at": data.get("created_at", ""),
"entry_count": data.get("entry_count", len(data.get("entries", []))),
"path": str(snap_path),
})
except (json.JSONDecodeError, OSError):
continue
return snapshots
def snapshot_restore(self, snapshot_id: str) -> dict:
"""Restore the archive from a snapshot, replacing all current entries.
Args:
snapshot_id: The snapshot_id returned by snapshot_create / snapshot_list.
Returns:
Dict with keys: snapshot_id, restored_count, previous_count
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
previous_count = len(self._entries)
self._entries = {}
for entry_data in data.get("entries", []):
entry = ArchiveEntry.from_dict(entry_data)
self._entries[entry.id] = entry
self._save()
return {
"snapshot_id": snapshot_id,
"restored_count": len(self._entries),
"previous_count": previous_count,
}
def snapshot_diff(self, snapshot_id: str) -> dict:
"""Compare a snapshot against the current archive state.
Args:
snapshot_id: The snapshot_id to compare against current state.
Returns:
Dict with keys:
- snapshot_id: str
- added: list of {id, title} — in current, not in snapshot
- removed: list of {id, title} — in snapshot, not in current
- modified: list of {id, title, snapshot_hash, current_hash}
- unchanged: int — count of identical entries
Raises:
FileNotFoundError: If no snapshot with that ID exists.
"""
snap_dir = self._snapshot_dir()
snap_path = snap_dir / f"{snapshot_id}.json"
if not snap_path.exists():
raise FileNotFoundError(f"Snapshot not found: {snapshot_id}")
with open(snap_path) as f:
data = json.load(f)
snap_entries: dict[str, dict] = {}
for entry_data in data.get("entries", []):
snap_entries[entry_data["id"]] = entry_data
current_ids = set(self._entries.keys())
snap_ids = set(snap_entries.keys())
added = []
for eid in current_ids - snap_ids:
e = self._entries[eid]
added.append({"id": e.id, "title": e.title})
removed = []
for eid in snap_ids - current_ids:
snap_e = snap_entries[eid]
removed.append({"id": snap_e["id"], "title": snap_e.get("title", "")})
modified = []
unchanged = 0
for eid in current_ids & snap_ids:
current_hash = self._entries[eid].content_hash
snap_hash = snap_entries[eid].get("content_hash")
if current_hash != snap_hash:
modified.append({
"id": eid,
"title": self._entries[eid].title,
"snapshot_hash": snap_hash,
"current_hash": current_hash,
})
else:
unchanged += 1
return {
"snapshot_id": snapshot_id,
"added": sorted(added, key=lambda x: x["title"]),
"removed": sorted(removed, key=lambda x: x["title"]),
"modified": sorted(modified, key=lambda x: x["title"]),
"unchanged": unchanged,
}
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
@@ -1308,88 +972,123 @@ class MnemosyneArchive:
self._save()
return total_links
# ─── Discovery ──────────────────────────────────────────────
def discover(
def consolidate(
self,
count: int = 5,
prefer_fading: bool = True,
topic: Optional[str] = None,
similarity_threshold: float = 0.9,
dry_run: bool = False,
) -> list[dict]:
"""Serendipitous entry discovery — surface forgotten knowledge.
"""Find and merge duplicate or near-duplicate entries.
Selects entries probabilistically, weighting toward fading (low vitality)
entries when prefer_fading=True, or toward vibrant entries when False.
Optionally filter by topic.
Scans all entries for:
1. Exact duplicates: same content_hash
2. Near-duplicates: embedding similarity > threshold (when available)
Touches selected entries to boost their vitality, preventing the same
entries from being repeatedly surfaced.
When merging, the older entry is kept. Topics, links, and metadata
from the newer entry are merged into the survivor. The newer entry
is removed.
Args:
count: Number of entries to discover.
prefer_fading: If True, weight toward neglected entries. If False,
weight toward vibrant entries.
topic: Optional topic filter — only discover entries with this tag.
similarity_threshold: Minimum cosine similarity to consider
near-duplicate (default 0.9). Only used with embedding backend.
dry_run: If True, returns merge pairs without modifying the archive.
Returns:
List of dicts with keys: entry_id, title, content_preview, topics,
vitality, age_days, last_accessed
List of dicts with keys: kept_id, removed_id, reason, similarity.
"""
import random
merges = []
entries = list(self._entries.values())
removed_ids: set[str] = set()
candidates = list(self._entries.values())
# Phase 1: exact duplicates by content_hash
hash_groups: dict[str, list[ArchiveEntry]] = {}
for entry in entries:
if entry.content_hash:
hash_groups.setdefault(entry.content_hash, []).append(entry)
# Filter by topic if specified
if topic:
topic_lower = topic.lower()
candidates = [
e for e in candidates
if topic_lower in [t.lower() for t in e.topics]
]
for content_hash, group in hash_groups.items():
if len(group) < 2:
continue
group.sort(key=lambda e: e.created_at)
keeper = group[0]
for dup in group[1:]:
if dup.id in removed_ids:
continue
merges.append({
"kept_id": keeper.id,
"removed_id": dup.id,
"kept_title": keeper.title,
"removed_title": dup.title,
"reason": "exact_content_hash",
"similarity": 1.0,
})
removed_ids.add(dup.id)
if not candidates:
return []
# Phase 2: near-duplicates via embedding similarity
if self._embedding_backend is not None:
active = [e for e in entries if e.id not in removed_ids]
for i, a in enumerate(active):
if a.id in removed_ids:
continue
vec_a = self.linker._get_embedding(a)
if not vec_a:
continue
for b in active[i + 1:]:
if b.id in removed_ids:
continue
vec_b = self.linker._get_embedding(b)
if not vec_b:
continue
sim = self._embedding_backend.similarity(vec_a, vec_b)
if sim >= similarity_threshold:
if a.created_at <= b.created_at:
keeper, loser = a, b
else:
keeper, loser = b, a
merges.append({
"kept_id": keeper.id,
"removed_id": loser.id,
"kept_title": keeper.title,
"removed_title": loser.title,
"reason": "embedding_similarity",
"similarity": round(sim, 4),
})
removed_ids.add(loser.id)
# Compute vitality for each candidate
scored = []
for entry in candidates:
v = self._compute_vitality(entry)
scored.append((entry, v))
if dry_run:
return merges
# Build selection weights
if prefer_fading:
# Lower vitality = higher weight. Invert and normalize.
weights = [max(0.01, 1.0 - v) for _, v in scored]
else:
# Higher vitality = higher weight
weights = [max(0.01, v) for _, v in scored]
# Execute merges
for merge in merges:
keeper = self._entries.get(merge["kept_id"])
loser = self._entries.get(merge["removed_id"])
if keeper is None or loser is None:
continue
# Sample without replacement
k = min(count, len(scored))
selected_indices = random.choices(range(len(scored)), weights=weights, k=k)
# Deduplicate while preserving order
seen = set()
unique_indices = []
for idx in selected_indices:
if idx not in seen:
seen.add(idx)
unique_indices.append(idx)
for topic in loser.topics:
if topic not in keeper.topics:
keeper.topics.append(topic)
results = []
for idx in unique_indices:
entry, v = scored[idx]
# Touch to boost vitality
self.touch(entry.id)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
results.append({
"entry_id": entry.id,
"title": entry.title,
"content_preview": entry.content[:200] + "..." if len(entry.content) > 200 else entry.content,
"topics": entry.topics,
"vitality": round(v, 4),
"age_days": age_days,
"last_accessed": entry.last_accessed,
})
for link_id in loser.links:
if link_id != keeper.id and link_id not in keeper.links:
keeper.links.append(link_id)
for key, value in loser.metadata.items():
if key not in keeper.metadata:
keeper.metadata[key] = value
keeper.updated_at = datetime.now(timezone.utc).isoformat()
del self._entries[loser.id]
for entry in self._entries.values():
if merge["removed_id"] in entry.links:
entry.links.remove(merge["removed_id"])
if merge["kept_id"] not in entry.links and merge["kept_id"] != entry.id:
entry.links.append(merge["kept_id"])
if merges:
self._save()
return merges
return results

View File

@@ -4,11 +4,7 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne topics, mnemosyne remove, mnemosyne export,
mnemosyne clusters, mnemosyne hubs, mnemosyne bridges, mnemosyne rebuild,
mnemosyne tag, mnemosyne untag, mnemosyne retag,
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff
mnemosyne discover [-n COUNT] [-t TOPIC] [--vibrant]
mnemosyne timeline, mnemosyne neighbors
"""
from __future__ import annotations
@@ -158,6 +154,23 @@ def cmd_rebuild(args):
print(f"Rebuilt links: {total} connections across {archive.count} entries")
def cmd_consolidate(args):
archive = MnemosyneArchive()
threshold = args.threshold
merges = archive.consolidate(similarity_threshold=threshold, dry_run=args.dry_run)
if not merges:
print("No duplicates found.")
return
action = "Would merge" if args.dry_run else "Merged"
print(f"{action} {len(merges)} pair(s):\n")
for m in merges:
sim = m["similarity"]
reason = m["reason"]
print(f" [{reason}] {m['kept_title'][:60]}")
print(f" kept: {m['kept_id'][:8]}")
print(f" removed: {m['removed_id'][:8]} (similarity: {sim})\n")
def cmd_tag(args):
archive = MnemosyneArchive()
tags = [t.strip() for t in args.tags.split(",") if t.strip()]
@@ -210,38 +223,6 @@ def cmd_timeline(args):
print()
def cmd_path(args):
archive = MnemosyneArchive(archive_path=args.archive) if args.archive else MnemosyneArchive()
path = archive.shortest_path(args.start, args.end)
if path is None:
print(f"No path found between {args.start} and {args.end}")
return
steps = archive.path_explanation(path)
print(f"Path ({len(steps)} hops):")
for i, step in enumerate(steps):
arrow = "" if i > 0 else " "
print(f"{arrow}{step['id']}: {step['title']}")
if step['topics']:
print(f" topics: {', '.join(step['topics'])}")
def cmd_consolidate(args):
archive = MnemosyneArchive()
merges = archive.consolidate(threshold=args.threshold, dry_run=args.dry_run)
if not merges:
print("No duplicates found.")
return
label = "[DRY RUN] " if args.dry_run else ""
for m in merges:
print(f"{label}Merge ({m['reason']}, score={m['score']:.4f}):")
print(f" kept: {m['kept'][:8]}")
print(f" removed: {m['removed'][:8]}")
if args.dry_run:
print(f"\n{len(merges)} pair(s) would be merged. Re-run without --dry-run to apply.")
else:
print(f"\nMerged {len(merges)} duplicate pair(s).")
def cmd_neighbors(args):
archive = MnemosyneArchive()
try:
@@ -258,144 +239,6 @@ def cmd_neighbors(args):
print()
def cmd_touch(args):
archive = MnemosyneArchive()
try:
entry = archive.touch(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
v = archive.get_vitality(entry.id)
print(f"[{entry.id[:8]}] {entry.title}")
print(f" Vitality: {v['vitality']:.4f} (boosted)")
def cmd_decay(args):
archive = MnemosyneArchive()
result = archive.apply_decay()
print(f"Applied decay to {result['total_entries']} entries")
print(f" Decayed: {result['decayed_count']}")
print(f" Avg vitality: {result['avg_vitality']:.4f}")
print(f" Fading (<0.3): {result['fading_count']}")
print(f" Vibrant (>0.7): {result['vibrant_count']}")
def cmd_vitality(args):
archive = MnemosyneArchive()
try:
v = archive.get_vitality(args.entry_id)
except KeyError:
print(f"Entry not found: {args.entry_id}")
sys.exit(1)
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f}")
print(f" Last accessed: {v['last_accessed'] or 'never'}")
print(f" Age: {v['age_days']} days")
def cmd_fading(args):
archive = MnemosyneArchive()
results = archive.fading(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def cmd_snapshot(args):
archive = MnemosyneArchive()
if args.snapshot_cmd == "create":
result = archive.snapshot_create(label=args.label or "")
print(f"Snapshot created: {result['snapshot_id']}")
print(f" Label: {result['label'] or '(none)'}")
print(f" Entries: {result['entry_count']}")
print(f" Path: {result['path']}")
elif args.snapshot_cmd == "list":
snapshots = archive.snapshot_list()
if not snapshots:
print("No snapshots found.")
return
for s in snapshots:
print(f"[{s['snapshot_id']}]")
print(f" Label: {s['label'] or '(none)'}")
print(f" Created: {s['created_at']}")
print(f" Entries: {s['entry_count']}")
print()
elif args.snapshot_cmd == "restore":
try:
result = archive.snapshot_restore(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Restored from snapshot: {result['snapshot_id']}")
print(f" Entries restored: {result['restored_count']}")
print(f" Previous count: {result['previous_count']}")
elif args.snapshot_cmd == "diff":
try:
diff = archive.snapshot_diff(args.snapshot_id)
except FileNotFoundError as e:
print(str(e))
sys.exit(1)
print(f"Diff vs snapshot: {diff['snapshot_id']}")
print(f" Added ({len(diff['added'])}): ", end="")
if diff["added"]:
print()
for e in diff["added"]:
print(f" + [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Removed ({len(diff['removed'])}): ", end="")
if diff["removed"]:
print()
for e in diff["removed"]:
print(f" - [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Modified({len(diff['modified'])}): ", end="")
if diff["modified"]:
print()
for e in diff["modified"]:
print(f" ~ [{e['id'][:8]}] {e['title']}")
else:
print("none")
print(f" Unchanged: {diff['unchanged']}")
else:
print(f"Unknown snapshot subcommand: {args.snapshot_cmd}")
sys.exit(1)
def cmd_discover(args):
archive = MnemosyneArchive()
results = archive.discover(
count=args.count,
prefer_fading=not args.vibrant,
topic=args.topic if args.topic else None,
)
if not results:
print("No entries found." + (" (topic filter too narrow?)" if args.topic else ""))
return
for r in results:
print(f"[{r['entry_id'][:8]}] {r['title']}")
print(f" Topics: {', '.join(r['topics'])} | Vitality: {r['vitality']} | Age: {r['age_days']}d")
print(f" {r['content_preview']}")
print()
def cmd_vibrant(args):
archive = MnemosyneArchive()
results = archive.vibrant(limit=args.limit)
if not results:
print("Archive is empty.")
return
for v in results:
print(f"[{v['entry_id'][:8]}] {v['title']}")
print(f" Vitality: {v['vitality']:.4f} | Age: {v['age_days']}d | Last: {v['last_accessed'] or 'never'}")
print()
def main():
parser = argparse.ArgumentParser(prog="mnemosyne", description="The Living Holographic Archive")
sub = parser.add_subparsers(dest="command")
@@ -437,6 +280,10 @@ def main():
rb = sub.add_parser("rebuild", help="Recompute all links from scratch")
rb.add_argument("-t", "--threshold", type=float, default=None, help="Similarity threshold override")
co = sub.add_parser("consolidate", help="Find and merge duplicate/near-duplicate entries")
co.add_argument("-t", "--threshold", type=float, default=0.9, help="Similarity threshold for near-duplicates (default: 0.9)")
co.add_argument("--dry-run", action="store_true", help="Show what would merge without modifying")
tg = sub.add_parser("tag", help="Add tags to an existing entry")
tg.add_argument("entry_id", help="Entry ID")
tg.add_argument("tags", help="Comma-separated tags to add")
@@ -457,53 +304,10 @@ def main():
nb.add_argument("entry_id", help="Anchor entry ID")
nb.add_argument("--days", type=int, default=7, help="Window in days (default: 7)")
pa = sub.add_parser("path", help="Find shortest path between two memories")
pa.add_argument("start", help="Starting entry ID")
pa.add_argument("end", help="Target entry ID")
pa.add_argument("--archive", default=None, help="Archive path")
co = sub.add_parser("consolidate", help="Merge duplicate/near-duplicate entries")
co.add_argument("--dry-run", action="store_true", help="Show what would be merged without applying")
co.add_argument("--threshold", type=float, default=0.9, help="Similarity threshold (default: 0.9)")
tc = sub.add_parser("touch", help="Boost an entry's vitality by accessing it")
tc.add_argument("entry_id", help="Entry ID to touch")
dc = sub.add_parser("decay", help="Apply time-based decay to all entries")
vy = sub.add_parser("vitality", help="Show an entry's vitality status")
vy.add_argument("entry_id", help="Entry ID to check")
fg = sub.add_parser("fading", help="Show most neglected entries (lowest vitality)")
fg.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
dc = sub.add_parser("discover", help="Serendipitous entry discovery")
dc.add_argument("-n", "--count", type=int, default=5, help="Number of entries to discover")
dc.add_argument("-t", "--topic", default=None, help="Filter by topic")
dc.add_argument("--vibrant", action="store_true", help="Prefer vibrant (alive) entries over fading ones")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
sn_create.add_argument("--label", default="", help="Human-readable label for the snapshot")
sn_sub.add_parser("list", help="List available snapshots")
sn_restore = sn_sub.add_parser("restore", help="Restore archive from a snapshot")
sn_restore.add_argument("snapshot_id", help="Snapshot ID to restore")
sn_diff = sn_sub.add_parser("diff", help="Show what changed since a snapshot")
sn_diff.add_argument("snapshot_id", help="Snapshot ID to compare against")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "snapshot" and not args.snapshot_cmd:
sn.print_help()
sys.exit(1)
dispatch = {
"stats": cmd_stats,
@@ -517,20 +321,12 @@ def main():
"hubs": cmd_hubs,
"bridges": cmd_bridges,
"rebuild": cmd_rebuild,
"consolidate": cmd_consolidate,
"tag": cmd_tag,
"untag": cmd_untag,
"retag": cmd_retag,
"timeline": cmd_timeline,
"neighbors": cmd_neighbors,
"consolidate": cmd_consolidate,
"path": cmd_path,
"touch": cmd_touch,
"decay": cmd_decay,
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"snapshot": cmd_snapshot,
"discover": cmd_discover,
}
dispatch[args.command](args)

View File

@@ -1,138 +0,0 @@
"""Tests for Mnemosyne CLI commands — path, touch, decay, vitality, fading, vibrant."""
import json
import tempfile
from pathlib import Path
from unittest.mock import patch
import sys
import io
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
@pytest.fixture
def archive(tmp_path):
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path)
@pytest.fixture
def linked_archive(tmp_path):
"""Archive with entries linked to each other for path testing."""
path = tmp_path / "test_archive.json"
arch = MnemosyneArchive(archive_path=path, auto_embed=False)
e1 = arch.add(ArchiveEntry(title="Alpha", content="first entry about python", topics=["code"]))
e2 = arch.add(ArchiveEntry(title="Beta", content="second entry about python coding", topics=["code"]))
e3 = arch.add(ArchiveEntry(title="Gamma", content="third entry about cooking recipes", topics=["food"]))
return arch, e1, e2, e3
class TestPathCommand:
def test_shortest_path_exists(self, linked_archive):
arch, e1, e2, e3 = linked_archive
path = arch.shortest_path(e1.id, e2.id)
assert path is not None
assert path[0] == e1.id
assert path[-1] == e2.id
def test_shortest_path_no_connection(self, linked_archive):
arch, e1, e2, e3 = linked_archive
# e3 (cooking) likely not linked to e1 (python coding)
path = arch.shortest_path(e1.id, e3.id)
# Path may or may not exist depending on linking threshold
# Either None or a list is valid
def test_shortest_path_same_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, e1.id)
assert path == [e1.id]
def test_shortest_path_missing_entry(self, linked_archive):
arch, e1, _, _ = linked_archive
path = arch.shortest_path(e1.id, "nonexistent-id")
assert path is None
class TestTouchCommand:
def test_touch_boosts_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
# Simulate time passing by setting old last_accessed
old_time = "2020-01-01T00:00:00+00:00"
entry.last_accessed = old_time
entry.vitality = 0.5
archive._save()
touched = archive.touch(entry.id)
assert touched.vitality > 0.5
assert touched.last_accessed != old_time
def test_touch_missing_entry(self, archive):
with pytest.raises(KeyError):
archive.touch("nonexistent-id")
class TestDecayCommand:
def test_apply_decay_returns_stats(self, archive):
archive.add(ArchiveEntry(title="Test", content="Content"))
result = archive.apply_decay()
assert result["total_entries"] == 1
assert "avg_vitality" in result
assert "fading_count" in result
assert "vibrant_count" in result
def test_decay_on_empty_archive(self, archive):
result = archive.apply_decay()
assert result["total_entries"] == 0
assert result["avg_vitality"] == 0.0
class TestVitalityCommand:
def test_get_vitality(self, archive):
entry = archive.add(ArchiveEntry(title="Test", content="Content"))
v = archive.get_vitality(entry.id)
assert v["entry_id"] == entry.id
assert v["title"] == "Test"
assert 0.0 <= v["vitality"] <= 1.0
assert v["age_days"] >= 0
def test_get_vitality_missing(self, archive):
with pytest.raises(KeyError):
archive.get_vitality("nonexistent-id")
class TestFadingVibrant:
def test_fading_returns_sorted_ascending(self, archive):
# Add entries with different vitalities
e1 = archive.add(ArchiveEntry(title="Vibrant", content="High energy"))
e2 = archive.add(ArchiveEntry(title="Fading", content="Low energy"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.fading(limit=10)
assert len(results) == 2
assert results[0]["vitality"] <= results[1]["vitality"]
def test_vibrant_returns_sorted_descending(self, archive):
e1 = archive.add(ArchiveEntry(title="Fresh", content="New"))
e2 = archive.add(ArchiveEntry(title="Old", content="Ancient"))
e2.vitality = 0.1
e2.last_accessed = "2020-01-01T00:00:00+00:00"
archive._save()
results = archive.vibrant(limit=10)
assert len(results) == 2
assert results[0]["vitality"] >= results[1]["vitality"]
def test_fading_limit(self, archive):
for i in range(15):
archive.add(ArchiveEntry(title=f"Entry {i}", content=f"Content {i}"))
results = archive.fading(limit=5)
assert len(results) == 5
def test_vibrant_empty(self, archive):
results = archive.vibrant()
assert results == []

View File

@@ -1,176 +1,137 @@
"""Tests for MnemosyneArchive.consolidate() — duplicate/near-duplicate merging."""
"""Tests for MnemosyneArchive.consolidate()."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
def _archive(tmp: str) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=Path(tmp) / "archive.json", auto_embed=False)
@pytest.fixture
def archive(tmp_path):
"""Create an archive with auto_embed disabled for deterministic tests."""
path = tmp_path / "test_archive.json"
return MnemosyneArchive(archive_path=path, auto_embed=False)
def test_consolidate_exact_duplicate_removed():
"""Two entries with identical content_hash are merged; only one survives."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Hello world", content="Exactly the same content", topics=["a"])
# Manually add a second entry with the same hash to simulate a duplicate
e2 = ArchiveEntry(title="Hello world", content="Exactly the same content", topics=["b"])
# Bypass dedup guard so we can test consolidate() rather than add()
archive._entries[e2.id] = e2
archive._save()
class TestConsolidateExactDuplicates:
"""Phase 1: exact duplicate detection by content_hash."""
assert archive.count == 2
merges = archive.consolidate(dry_run=False)
def test_finds_exact_duplicates(self, archive):
entry_a = ArchiveEntry(title="Hello", content="World")
entry_b = ArchiveEntry(title="Hello", content="World")
archive.add(entry_a, auto_link=False)
archive.add(entry_b, auto_link=False)
# Force same content_hash
entry_b.content_hash = entry_a.content_hash
# Re-add entry_b manually (bypass add() dedup)
archive._entries[entry_b.id] = entry_b
merges = archive.consolidate()
assert len(merges) == 1
assert merges[0]["reason"] == "exact_hash"
assert merges[0]["score"] == 1.0
assert archive.count == 1
assert merges[0]["reason"] == "exact_content_hash"
assert merges[0]["similarity"] == 1.0
def test_keeps_older_entry(self, archive):
entry_a = ArchiveEntry(title="First", content="Data", created_at="2024-01-01T00:00:00+00:00")
entry_b = ArchiveEntry(title="Second", content="Data", created_at="2024-06-01T00:00:00+00:00")
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
def test_consolidate_keeps_older_entry():
"""The older entry (earlier created_at) is kept, the newer is removed."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Hello world", content="Same content here", topics=[])
e2 = ArchiveEntry(title="Hello world", content="Same content here", topics=[])
# Make e2 clearly newer
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
merges = archive.consolidate()
assert merges[0]["kept_id"] == entry_a.id
assert merges[0]["removed_id"] == entry_b.id
merges = archive.consolidate(dry_run=False)
assert len(merges) == 1
assert merges[0]["kept"] == e1.id
assert merges[0]["removed"] == e2.id
def test_consolidate_merges_topics():
"""Topics from the removed entry are merged (unioned) into the kept entry."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ingest_event(archive, title="Memory item", content="Shared content body", topics=["alpha"])
e2 = ArchiveEntry(title="Memory item", content="Shared content body", topics=["beta", "gamma"])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor is not None
topic_lower = {t.lower() for t in survivor.topics}
assert "alpha" in topic_lower
assert "beta" in topic_lower
assert "gamma" in topic_lower
def test_consolidate_merges_metadata():
"""Metadata from the removed entry is merged into the kept entry; kept values win."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
e1 = ArchiveEntry(
title="Shared", content="Identical body here", topics=[], metadata={"k1": "v1", "shared": "kept"}
)
archive._entries[e1.id] = e1
e2 = ArchiveEntry(
title="Shared", content="Identical body here", topics=[], metadata={"k2": "v2", "shared": "removed"}
)
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor.metadata["k1"] == "v1"
assert survivor.metadata["k2"] == "v2"
assert survivor.metadata["shared"] == "kept" # kept entry wins
def test_consolidate_dry_run_no_mutation():
"""Dry-run mode returns merge plan but does not alter the archive."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
ingest_event(archive, title="Same", content="Identical content to dedup", topics=[])
e2 = ArchiveEntry(title="Same", content="Identical content to dedup", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
def test_dry_run_does_not_modify(self, archive):
entry_a = ArchiveEntry(title="A", content="Same")
entry_b = ArchiveEntry(title="B", content="Same")
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
count_before = archive.count
merges = archive.consolidate(dry_run=True)
assert len(merges) == 1
assert merges[0]["dry_run"] is True
# Archive must be unchanged
assert archive.count == 2
assert archive.count == count_before # unchanged
def test_consolidate_no_duplicates():
"""When no duplicates exist, consolidate returns an empty list."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
ingest_event(archive, title="Unique A", content="This is completely unique content for A")
ingest_event(archive, title="Unique B", content="Totally different words here for B")
merges = archive.consolidate(threshold=0.9)
def test_no_duplicates_returns_empty(self, archive):
archive.add(ArchiveEntry(title="Unique A", content="Content A"), auto_link=False)
archive.add(ArchiveEntry(title="Unique B", content="Content B"), auto_link=False)
merges = archive.consolidate()
assert merges == []
def test_merges_topics(self, archive):
entry_a = ArchiveEntry(title="A", content="Data", topics=["python"])
entry_b = ArchiveEntry(title="B", content="Data", topics=["testing"])
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
def test_consolidate_transfers_links():
"""Links from the removed entry are inherited by the kept entry."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
# Create a third entry to act as a link target
target = ingest_event(archive, title="Target", content="The link target entry", topics=[])
archive.consolidate()
keeper = archive.get(entry_a.id)
assert "python" in keeper.topics
assert "testing" in keeper.topics
e1 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[], links=[target.id])
archive._entries[e1.id] = e1
target.links.append(e1.id)
def test_merges_links(self, archive):
entry_c = ArchiveEntry(title="C", content="Ref")
archive.add(entry_c, auto_link=False)
e2 = ArchiveEntry(title="Dup", content="Exact duplicate body text", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
entry_a = ArchiveEntry(title="A", content="Data", links=[entry_c.id])
entry_b = ArchiveEntry(title="B", content="Data", links=[entry_c.id])
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate(dry_run=False)
survivor = archive.get(e1.id)
assert survivor is not None
assert target.id in survivor.links
archive.consolidate()
keeper = archive.get(entry_a.id)
assert entry_c.id in keeper.links
def test_removes_duplicate_from_archive(self, archive):
entry_a = ArchiveEntry(title="A", content="Same")
entry_b = ArchiveEntry(title="B", content="Same")
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate()
assert archive.get(entry_a.id) is not None
assert archive.get(entry_b.id) is None
def test_fixes_links_pointing_to_removed(self, archive):
entry_a = ArchiveEntry(title="A", content="Same")
entry_b = ArchiveEntry(title="B", content="Same")
entry_c = ArchiveEntry(title="C", content="Ref", links=[entry_b.id])
entry_b.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive.add(entry_c, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive.consolidate()
survivor = archive.get(entry_c.id)
assert entry_b.id not in survivor.links
assert entry_a.id in survivor.links
def test_consolidate_near_duplicate_semantic():
"""Near-duplicate entries above the similarity threshold are merged."""
with tempfile.TemporaryDirectory() as tmp:
archive = _archive(tmp)
# Entries with very high Jaccard overlap
text_a = "python automation scripting building tools workflows"
text_b = "python automation scripting building tools workflows tasks"
e1 = ArchiveEntry(title="Automator", content=text_a, topics=[])
e2 = ArchiveEntry(title="Automator", content=text_b, topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e1.id] = e1
archive._entries[e2.id] = e2
archive._save()
class TestConsolidateTripleDuplicates:
"""Handle 3+ entries with the same content_hash."""
# Use a low threshold to ensure these very similar entries match
merges = archive.consolidate(threshold=0.7, dry_run=False)
assert len(merges) >= 1
assert merges[0]["reason"] == "semantic_similarity"
def test_three_way_merge(self, archive):
entry_a = ArchiveEntry(title="A", content="Same", created_at="2024-01-01T00:00:00+00:00")
entry_b = ArchiveEntry(title="B", content="Same", created_at="2024-02-01T00:00:00+00:00")
entry_c = ArchiveEntry(title="C", content="Same", created_at="2024-03-01T00:00:00+00:00")
entry_b.content_hash = entry_a.content_hash
entry_c.content_hash = entry_a.content_hash
archive.add(entry_a, auto_link=False)
archive._entries[entry_b.id] = entry_b
archive._entries[entry_c.id] = entry_c
def test_consolidate_persists_after_reload():
"""After consolidation, the reduced archive survives a save/reload cycle."""
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(archive, title="Persist test", content="Body to dedup and persist", topics=[])
e2 = ArchiveEntry(title="Persist test", content="Body to dedup and persist", topics=[])
e2.created_at = "2099-01-01T00:00:00+00:00"
archive._entries[e2.id] = e2
archive._save()
archive.consolidate(dry_run=False)
assert archive.count == 1
reloaded = MnemosyneArchive(archive_path=path, auto_embed=False)
assert reloaded.count == 1
merges = archive.consolidate()
assert len(merges) == 2
assert all(m["kept_id"] == entry_a.id for m in merges)

View File

@@ -1,85 +0,0 @@
"""Tests for Mnemosyne discover functionality."""
import tempfile
from pathlib import Path
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive_with_entries():
"""Helper: create an archive with test entries."""
path = Path(tempfile.mkdtemp()) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(archive, title="Python automation", content="Building tools in Python", topics=["python", "automation"])
ingest_event(archive, title="Cooking pasta", content="How to make carbonara", topics=["cooking"])
ingest_event(archive, title="Bitcoin basics", content="Understanding Bitcoin and blockchain", topics=["bitcoin", "crypto"])
ingest_event(archive, title="AI agents", content="Building autonomous AI agents", topics=["ai", "agents"])
ingest_event(archive, title="Meditation guide", content="Mindfulness and meditation techniques", topics=["wellness"])
return archive
def test_discover_returns_entries():
archive = _make_archive_with_entries()
results = archive.discover(count=3)
assert len(results) == 3
for r in results:
assert "entry_id" in r
assert "title" in r
assert "content_preview" in r
assert "topics" in r
assert "vitality" in r
assert "age_days" in r
def test_discover_respects_count():
archive = _make_archive_with_entries()
results = archive.discover(count=2)
assert len(results) == 2
def test_discover_count_exceeds_entries():
archive = _make_archive_with_entries()
results = archive.discover(count=100)
assert len(results) == archive.count
def test_discover_topic_filter():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="python")
assert len(results) == 1
assert results[0]["title"] == "Python automation"
def test_discover_topic_case_insensitive():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="Python")
assert len(results) == 1
def test_discover_empty_topic_returns_nothing():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="nonexistent")
assert len(results) == 0
def test_discover_boosts_vitality():
archive = _make_archive_with_entries()
# Get initial vitality
before = archive.fading(limit=5)
# Discover (which touches entries)
archive.discover(count=3)
# The touched entries should have higher vitality now
after = archive.fading(limit=5)
# At least some entries should have changed vitality
before_vitals = {e["entry_id"]: e["vitality"] for e in before}
after_vitals = {e["entry_id"]: e["vitality"] for e in after}
changed = sum(1 for eid in before_vitals if eid in after_vitals and abs(before_vitals[eid] - after_vitals[eid]) > 0.001)
assert changed >= 1, "Discover should touch and boost vitality of selected entries"
def test_discover_empty_archive():
path = Path(tempfile.mkdtemp()) / "empty.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
results = archive.discover(count=5)
assert len(results) == 0

View File

@@ -1,106 +0,0 @@
"""Tests for MnemosyneArchive.shortest_path and path_explanation."""
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def _make_archive(tmp_path):
archive = MnemosyneArchive(str(tmp_path / "test_archive.json"))
return archive
class TestShortestPath:
def test_direct_connection(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "first entry", topics=["start"])
b = archive.add("Beta", "second entry", topics=["end"])
# Manually link
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = archive.shortest_path(a.id, b.id)
assert path == [a.id, b.id]
def test_multi_hop_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "alpha", topics=["x"])
b = archive.add("B", "beta", topics=["y"])
c = archive.add("C", "gamma", topics=["z"])
# Chain: A -> B -> C
a.links.append(b.id)
b.links.extend([a.id, c.id])
c.links.append(b.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._entries[c.id] = c
archive._save()
path = archive.shortest_path(a.id, c.id)
assert path == [a.id, b.id, c.id]
def test_no_path(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "isolated", topics=[])
b = archive.add("B", "also isolated", topics=[])
path = archive.shortest_path(a.id, b.id)
assert path is None
def test_same_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "lonely", topics=[])
path = archive.shortest_path(a.id, a.id)
assert path == [a.id]
def test_nonexistent_entry(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "exists", topics=[])
path = archive.shortest_path("fake-id", a.id)
assert path is None
def test_shortest_of_multiple(self, tmp_path):
"""When multiple paths exist, BFS returns shortest."""
archive = _make_archive(tmp_path)
a = archive.add("A", "a", topics=[])
b = archive.add("B", "b", topics=[])
c = archive.add("C", "c", topics=[])
d = archive.add("D", "d", topics=[])
# A -> B -> D (short)
# A -> C -> B -> D (long)
a.links.extend([b.id, c.id])
b.links.extend([a.id, d.id, c.id])
c.links.extend([a.id, b.id])
d.links.append(b.id)
for e in [a, b, c, d]:
archive._entries[e.id] = e
archive._save()
path = archive.shortest_path(a.id, d.id)
assert len(path) == 3 # A -> B -> D, not A -> C -> B -> D
class TestPathExplanation:
def test_returns_step_details(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("Alpha", "the beginning", topics=["origin"])
b = archive.add("Beta", "the middle", topics=["process"])
a.links.append(b.id)
b.links.append(a.id)
archive._entries[a.id] = a
archive._entries[b.id] = b
archive._save()
path = [a.id, b.id]
steps = archive.path_explanation(path)
assert len(steps) == 2
assert steps[0]["title"] == "Alpha"
assert steps[1]["title"] == "Beta"
assert "origin" in steps[0]["topics"]
def test_content_preview_truncation(self, tmp_path):
archive = _make_archive(tmp_path)
a = archive.add("A", "x" * 200, topics=[])
steps = archive.path_explanation([a.id])
assert len(steps[0]["content_preview"]) <= 123 # 120 + "..."

View File

@@ -1,240 +0,0 @@
"""Tests for Mnemosyne snapshot (point-in-time backup/restore) feature."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive(tmp_dir: str) -> MnemosyneArchive:
path = Path(tmp_dir) / "archive.json"
return MnemosyneArchive(archive_path=path, auto_embed=False)
# ─── snapshot_create ─────────────────────────────────────────────────────────
def test_snapshot_create_returns_metadata():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Alpha", content="First entry", topics=["a"])
ingest_event(archive, title="Beta", content="Second entry", topics=["b"])
result = archive.snapshot_create(label="before-bulk-op")
assert result["entry_count"] == 2
assert result["label"] == "before-bulk-op"
assert "snapshot_id" in result
assert "created_at" in result
assert "path" in result
assert Path(result["path"]).exists()
def test_snapshot_create_no_label():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Gamma", content="Third entry", topics=[])
result = archive.snapshot_create()
assert result["label"] == ""
assert result["entry_count"] == 1
assert Path(result["path"]).exists()
def test_snapshot_file_contains_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Delta", content="Fourth entry", topics=["d"])
result = archive.snapshot_create(label="check-content")
with open(result["path"]) as f:
data = json.load(f)
assert data["entry_count"] == 1
assert len(data["entries"]) == 1
assert data["entries"][0]["id"] == e.id
assert data["entries"][0]["title"] == "Delta"
def test_snapshot_create_empty_archive():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
result = archive.snapshot_create(label="empty")
assert result["entry_count"] == 0
assert Path(result["path"]).exists()
# ─── snapshot_list ───────────────────────────────────────────────────────────
def test_snapshot_list_empty():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
assert archive.snapshot_list() == []
def test_snapshot_list_returns_all():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="One", content="c1", topics=[])
archive.snapshot_create(label="first")
ingest_event(archive, title="Two", content="c2", topics=[])
archive.snapshot_create(label="second")
snapshots = archive.snapshot_list()
assert len(snapshots) == 2
labels = {s["label"] for s in snapshots}
assert "first" in labels
assert "second" in labels
def test_snapshot_list_metadata_fields():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="meta-check")
snapshots = archive.snapshot_list()
s = snapshots[0]
for key in ("snapshot_id", "label", "created_at", "entry_count", "path"):
assert key in s
def test_snapshot_list_newest_first():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
archive.snapshot_create(label="a")
archive.snapshot_create(label="b")
snapshots = archive.snapshot_list()
# Filenames sort lexicographically; newest (b) should be first
# (filenames include timestamp so alphabetical = newest-last;
# snapshot_list reverses the glob order → newest first)
assert len(snapshots) == 2
# Both should be present; ordering is newest first
ids = [s["snapshot_id"] for s in snapshots]
assert ids == sorted(ids, reverse=True)
# ─── snapshot_restore ────────────────────────────────────────────────────────
def test_snapshot_restore_replaces_entries():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Kept", content="original content", topics=["orig"])
snap = archive.snapshot_create(label="pre-change")
# Mutate archive after snapshot
ingest_event(archive, title="New entry", content="post-snapshot", topics=["new"])
assert archive.count == 2
result = archive.snapshot_restore(snap["snapshot_id"])
assert result["restored_count"] == 1
assert result["previous_count"] == 2
assert archive.count == 1
entry = list(archive._entries.values())[0]
assert entry.title == "Kept"
def test_snapshot_restore_persists_to_disk():
with tempfile.TemporaryDirectory() as tmp:
path = Path(tmp) / "archive.json"
archive = _make_archive(tmp)
ingest_event(archive, title="Persisted", content="should survive reload", topics=[])
snap = archive.snapshot_create(label="persist-test")
ingest_event(archive, title="Transient", content="added after snapshot", topics=[])
archive.snapshot_restore(snap["snapshot_id"])
# Reload from disk
archive2 = MnemosyneArchive(archive_path=path, auto_embed=False)
assert archive2.count == 1
assert list(archive2._entries.values())[0].title == "Persisted"
def test_snapshot_restore_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_restore("nonexistent_snapshot_id")
# ─── snapshot_diff ───────────────────────────────────────────────────────────
def test_snapshot_diff_no_changes():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Stable", content="unchanged content", topics=[])
snap = archive.snapshot_create(label="baseline")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["added"] == []
assert diff["removed"] == []
assert diff["modified"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_added():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
ingest_event(archive, title="Original", content="existing", topics=[])
snap = archive.snapshot_create(label="before-add")
ingest_event(archive, title="Newcomer", content="added after", topics=[])
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["added"]) == 1
assert diff["added"][0]["title"] == "Newcomer"
assert diff["removed"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_removed():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e1 = ingest_event(archive, title="Will Be Removed", content="doomed", topics=[])
ingest_event(archive, title="Survivor", content="stays", topics=[])
snap = archive.snapshot_create(label="pre-removal")
archive.remove(e1.id)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["removed"]) == 1
assert diff["removed"][0]["title"] == "Will Be Removed"
assert diff["added"] == []
assert diff["unchanged"] == 1
def test_snapshot_diff_detects_modified():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
e = ingest_event(archive, title="Mutable", content="original content", topics=[])
snap = archive.snapshot_create(label="pre-edit")
archive.update_entry(e.id, content="updated content", auto_link=False)
diff = archive.snapshot_diff(snap["snapshot_id"])
assert len(diff["modified"]) == 1
assert diff["modified"][0]["title"] == "Mutable"
assert diff["modified"][0]["snapshot_hash"] != diff["modified"][0]["current_hash"]
assert diff["added"] == []
assert diff["removed"] == []
def test_snapshot_diff_missing_raises():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
with pytest.raises(FileNotFoundError):
archive.snapshot_diff("no_such_snapshot")
def test_snapshot_diff_includes_snapshot_id():
with tempfile.TemporaryDirectory() as tmp:
archive = _make_archive(tmp)
snap = archive.snapshot_create(label="id-check")
diff = archive.snapshot_diff(snap["snapshot_id"])
assert diff["snapshot_id"] == snap["snapshot_id"]