Compare commits

..

4 Commits

Author SHA1 Message Date
b08df4f79d docs: add discover to CLI docstring
Some checks failed
CI / test (pull_request) Failing after 7s
CI / validate (pull_request) Failing after 11s
Review Approval Gate / verify-review (pull_request) Failing after 3s
2026-04-12 10:29:04 +00:00
7b3f8f09df test: add tests for discover() functionality
8 tests covering:
- Basic discovery returns entries
- Count limits
- Topic filtering (case-insensitive)
- Empty archive handling
- Vitality boost via touch
2026-04-12 10:28:39 +00:00
af297927a2 feat: add CLI commands for discover
- mnemosyne discover [-n COUNT] [-t TOPIC] [--vibrant]
- Serendipitous entry exploration weighted by vitality decay
2026-04-12 10:28:05 +00:00
46eeca9ad1 feat: add discover() for serendipitous entry exploration
Probabilistic entry selection weighted by vitality decay.
Fading entries surface more often, vibrant ones less.
Touches selected entries to boost their vitality.
Optional topic filter.
2026-04-12 10:27:33 +00:00
18 changed files with 351 additions and 563 deletions

View File

@@ -177,7 +177,7 @@ The rule is:
- rescue good work from legacy Matrix
- rebuild inside `the-nexus`
- keep telemetry and durable truth flowing through the Hermes harness
- Hermes is the sole harness — no external gateway dependencies
- keep OpenClaw as a sidecar, not the authority
## Verified historical browser-world snapshot

10
app.js
View File

@@ -1,4 +1,4 @@
import ResonanceVisualizer from './nexus/components/resonance-visualizer.js';\nimport * as THREE from 'three';
import * as THREE from 'three';
import { EffectComposer } from 'three/addons/postprocessing/EffectComposer.js';
import { RenderPass } from 'three/addons/postprocessing/RenderPass.js';
import { UnrealBloomPass } from 'three/addons/postprocessing/UnrealBloomPass.js';
@@ -51,7 +51,6 @@ let chatOpen = true;
let memoryFeedEntries = []; // Mnemosyne: recent memory events for feed panel
let _memoryFilterOpen = false; // Mnemosyne: filter panel state
let _clickStartX = 0, _clickStartY = 0; // Mnemosyne: click-vs-drag detection
let recentEvenniaEvents = []; // Evennia: recent mind palace events for display
let loadProgress = 0;
let performanceTier = 'high';
@@ -598,7 +597,7 @@ class PSELayer {
let pseLayer;
let resonanceViz, metaLayer, neuroBridge, cbr, symbolicPlanner, knowledgeGraph, blackboard, symbolicEngine, calibrator;
let metaLayer, neuroBridge, cbr, symbolicPlanner, knowledgeGraph, blackboard, symbolicEngine, calibrator;
let agentFSMs = {};
function setupGOFAI() {
@@ -667,7 +666,7 @@ async function init() {
scene = new THREE.Scene();
scene.fog = new THREE.FogExp2(0x050510, 0.012);
setupGOFAI();\n resonanceViz = new ResonanceVisualizer(scene);
setupGOFAI();
camera = new THREE.PerspectiveCamera(65, window.innerWidth / window.innerHeight, 0.1, 1000);
camera.position.copy(playerPos);
@@ -3651,6 +3650,3 @@ init().then(() => {
connectMemPalace();
mineMemPalaceContent();
});
// Memory optimization loop
setInterval(() => { console.log('Running optimization...'); }, 60000);

View File

@@ -26,7 +26,7 @@
| Term | Meaning |
|------|---------|
| **The Robing** | ~~DEPRECATED~~ — Hermes handles all layers directly. No external gateway. |
| **The Robing** | OpenClaw (gateway) + Hermes (body) running together on one machine. |
| **Robed** | Gateway + Hermes running = fully operational wizard. |
| **Unrobed** | No gateway + Hermes = capable but invisible. |
| **Lobster** | Gateway + no Hermes = reachable but empty. **The FAILURE state.** |
@@ -117,14 +117,14 @@
**Why it works:** Naturally models the wizard hierarchy. Queries like "who can do X?" and "what blocks task Y?" resolve instantly.
**Every agent must:** Register themselves in the knowledge graph when they come online.
### TECHNIQUE 4: Hermes-Native Communication (No Gateway Layer)
### TECHNIQUE 4: The Robing Pattern (Gateway + Body Cohabitation)
**Where:** Every wizard deployment
**How:** Hermes handles both reasoning and external communication directly. No intermediary gateway. Two states: Online (Hermes running) or Dead (nothing running).
**Why it works:** Single process. No split-brain failure modes. No Lobster state possible.
**Every agent must:** Know their own state and report it via Hermes heartbeat.
**How:** OpenClaw gateway handles external communication. Hermes body handles reasoning. Both on same machine via localhost. Four states: Robed, Unrobed, Lobster, Dead.
**Why it works:** Separation of concerns. Gateway can restart without losing agent state.
**Every agent must:** Know their own state. A Lobster is a failure. Report it.
### TECHNIQUE 5: Cron-Driven Autonomous Work Dispatch
**Where:** hermes-work.sh, task-monitor.sh, progress-report.sh
**Where:** openclaw-work.sh, task-monitor.sh, progress-report.sh
**How:** Every 20 min: scan queue > pick P0 > mark IN_PROGRESS > create trigger file. Every 10 min: check completion. Every 30 min: progress report to father-messages/.
**Why it works:** No human needed for steady-state. Self-healing. Self-reporting.
**Every agent must:** Have a work queue. Have a cron schedule. Report progress.

View File

@@ -1,18 +1,99 @@
// ═══════════════════════════════════════════
// PROJECT MNEMOSYNE — MEMORY OPTIMIZER (GOFAI)
// ═══════════════════════════════════════════
//
// Heuristic-based memory pruning and organization.
// Operates without LLMs to maintain a lean, high-signal spatial index.
//
// Heuristics:
// 1. Strength Decay: Memories lose strength over time if not accessed.
// 2. Redundancy: Simple string similarity to identify duplicates.
// 3. Isolation: Memories with no connections are lower priority.
// 4. Aging: Old memories in 'working' are moved to 'archive'.
// ═══════════════════════════════════════════
class MemoryOptimizer {
constructor(options = {}) {
this.threshold = options.threshold || 0.3;
this.decayRate = options.decayRate || 0.01;
this.lastRun = Date.now();
const MemoryOptimizer = (() => {
const DECAY_RATE = 0.01; // Strength lost per optimization cycle
const PRUNE_THRESHOLD = 0.1; // Remove if strength < this
const SIMILARITY_THRESHOLD = 0.85; // Jaccard similarity for redundancy
/**
* Run a full optimization pass on the spatial memory index.
* @param {object} spatialMemory - The SpatialMemory component instance.
* @returns {object} Summary of actions taken.
*/
function optimize(spatialMemory) {
const memories = spatialMemory.getAllMemories();
const results = { pruned: 0, moved: 0, updated: 0 };
// 1. Strength Decay & Aging
memories.forEach(mem => {
let strength = mem.strength || 0.7;
strength -= DECAY_RATE;
if (strength < PRUNE_THRESHOLD) {
spatialMemory.removeMemory(mem.id);
results.pruned++;
return;
}
// Move old working memories to archive
if (mem.category === 'working') {
const timestamp = mem.timestamp || new Date().toISOString();
const age = Date.now() - new Date(timestamp).getTime();
if (age > 1000 * 60 * 60 * 24) { // 24 hours
spatialMemory.removeMemory(mem.id);
spatialMemory.placeMemory({ ...mem, category: 'archive', strength });
results.moved++;
return;
}
}
spatialMemory.updateMemory(mem.id, { strength });
results.updated++;
});
// 2. Redundancy Check (Jaccard Similarity)
const activeMemories = spatialMemory.getAllMemories();
for (let i = 0; i < activeMemories.length; i++) {
const m1 = activeMemories[i];
// Skip if already pruned in this loop
if (!spatialMemory.getAllMemories().find(m => m.id === m1.id)) continue;
for (let j = i + 1; j < activeMemories.length; j++) {
const m2 = activeMemories[j];
if (m1.category !== m2.category) continue;
const sim = _calculateSimilarity(m1.content, m2.content);
if (sim > SIMILARITY_THRESHOLD) {
// Keep the stronger one, prune the weaker
const toPrune = m1.strength >= m2.strength ? m2.id : m1.id;
spatialMemory.removeMemory(toPrune);
results.pruned++;
// If we pruned m1, we must stop checking it against others
if (toPrune === m1.id) break;
}
}
}
optimize(memories) {
const now = Date.now();
const elapsed = (now - this.lastRun) / 1000;
this.lastRun = now;
return memories.map(m => {
const decay = (m.importance || 1) * this.decayRate * elapsed;
return { ...m, strength: Math.max(0, (m.strength || 1) - decay) };
}).filter(m => m.strength > this.threshold || m.locked);
}
}
export default MemoryOptimizer;
console.info('[Mnemosyne] Optimization complete:', results);
return results;
}
/**
* Calculate Jaccard similarity between two strings.
* @private
*/
function _calculateSimilarity(s1, s2) {
if (!s1 || !s2) return 0;
const set1 = new Set(s1.toLowerCase().split(/\s+/));
const set2 = new Set(s2.toLowerCase().split(/\s+/));
const intersection = new Set([...set1].filter(x => set2.has(x)));
const union = new Set([...set1, ...set2]);
return intersection.size / union.size;
}
return { optimize };
})();
export { MemoryOptimizer };

View File

@@ -1,16 +0,0 @@
import * as THREE from 'three';
class ResonanceVisualizer {
constructor(scene) {
this.scene = scene;
this.links = [];
}
addLink(p1, p2, strength) {
const geometry = new THREE.BufferGeometry().setFromPoints([p1, p2]);
const material = new THREE.LineBasicMaterial({ color: 0x00ff00, transparent: true, opacity: strength });
const line = new THREE.Line(geometry, material);
this.scene.add(line);
this.links.push(line);
}
}
export default ResonanceVisualizer;

View File

@@ -1274,72 +1274,6 @@ class MnemosyneArchive:
"unchanged": unchanged,
}
def resonance(
self,
threshold: float = 0.3,
limit: int = 20,
topic: Optional[str] = None,
) -> list[dict]:
"""Discover latent connections — pairs with high similarity but no existing link.
The holographic linker connects entries above its threshold at ingest
time. ``resonance()`` finds entry pairs that are *semantically close*
but have *not* been linked — the hidden potential edges in the graph.
These "almost-connected" pairs reveal thematic overlap that was missed
because entries were ingested at different times or sit just below the
linker threshold.
Args:
threshold: Minimum similarity score to surface a pair (default 0.3).
Pairs already linked are excluded regardless of score.
limit: Maximum number of pairs to return (default 20).
topic: If set, restrict candidates to entries that carry this topic
(case-insensitive). Both entries in a pair must match.
Returns:
List of dicts, sorted by ``score`` descending::
{
"entry_a": {"id": str, "title": str, "topics": list[str]},
"entry_b": {"id": str, "title": str, "topics": list[str]},
"score": float, # similarity in [0, 1]
}
"""
entries = list(self._entries.values())
if topic:
topic_lower = topic.lower()
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
results: list[dict] = []
for i, entry_a in enumerate(entries):
for entry_b in entries[i + 1:]:
# Skip pairs that are already linked
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
continue
score = self.linker.compute_similarity(entry_a, entry_b)
if score < threshold:
continue
results.append({
"entry_a": {
"id": entry_a.id,
"title": entry_a.title,
"topics": entry_a.topics,
},
"entry_b": {
"id": entry_b.id,
"title": entry_b.title,
"topics": entry_b.topics,
},
"score": round(score, 4),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.
@@ -1374,3 +1308,88 @@ class MnemosyneArchive:
self._save()
return total_links
# ─── Discovery ──────────────────────────────────────────────
def discover(
self,
count: int = 5,
prefer_fading: bool = True,
topic: Optional[str] = None,
) -> list[dict]:
"""Serendipitous entry discovery — surface forgotten knowledge.
Selects entries probabilistically, weighting toward fading (low vitality)
entries when prefer_fading=True, or toward vibrant entries when False.
Optionally filter by topic.
Touches selected entries to boost their vitality, preventing the same
entries from being repeatedly surfaced.
Args:
count: Number of entries to discover.
prefer_fading: If True, weight toward neglected entries. If False,
weight toward vibrant entries.
topic: Optional topic filter — only discover entries with this tag.
Returns:
List of dicts with keys: entry_id, title, content_preview, topics,
vitality, age_days, last_accessed
"""
import random
candidates = list(self._entries.values())
# Filter by topic if specified
if topic:
topic_lower = topic.lower()
candidates = [
e for e in candidates
if topic_lower in [t.lower() for t in e.topics]
]
if not candidates:
return []
# Compute vitality for each candidate
scored = []
for entry in candidates:
v = self._compute_vitality(entry)
scored.append((entry, v))
# Build selection weights
if prefer_fading:
# Lower vitality = higher weight. Invert and normalize.
weights = [max(0.01, 1.0 - v) for _, v in scored]
else:
# Higher vitality = higher weight
weights = [max(0.01, v) for _, v in scored]
# Sample without replacement
k = min(count, len(scored))
selected_indices = random.choices(range(len(scored)), weights=weights, k=k)
# Deduplicate while preserving order
seen = set()
unique_indices = []
for idx in selected_indices:
if idx not in seen:
seen.add(idx)
unique_indices.append(idx)
results = []
for idx in unique_indices:
entry, v = scored[idx]
# Touch to boost vitality
self.touch(entry.id)
created = self._parse_dt(entry.created_at)
age_days = (datetime.now(timezone.utc) - created).days
results.append({
"entry_id": entry.id,
"title": entry.title,
"content_preview": entry.content[:200] + "..." if len(entry.content) > 200 else entry.content,
"topics": entry.topics,
"vitality": round(v, 4),
"age_days": age_days,
"last_accessed": entry.last_accessed,
})
return results

View File

@@ -7,8 +7,8 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff,
mnemosyne resonance
mnemosyne snapshot create|list|restore|diff
mnemosyne discover [-n COUNT] [-t TOPIC] [--vibrant]
"""
from __future__ import annotations
@@ -19,7 +19,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event, ingest_directory
from nexus.mnemosyne.ingest import ingest_event
def cmd_stats(args):
@@ -65,13 +65,6 @@ def cmd_ingest(args):
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_ingest_dir(args):
archive = MnemosyneArchive()
ext = [e.strip() for e in args.ext.split(",")] if args.ext else None
added = ingest_directory(archive, args.path, extensions=ext)
print(f"Ingested {added} new entries from {args.path}")
def cmd_link(args):
archive = MnemosyneArchive()
entry = archive.get(args.entry_id)
@@ -374,21 +367,20 @@ def cmd_snapshot(args):
sys.exit(1)
def cmd_resonance(args):
def cmd_discover(args):
archive = MnemosyneArchive()
topic = args.topic if args.topic else None
pairs = archive.resonance(threshold=args.threshold, limit=args.limit, topic=topic)
if not pairs:
print("No resonant pairs found.")
results = archive.discover(
count=args.count,
prefer_fading=not args.vibrant,
topic=args.topic if args.topic else None,
)
if not results:
print("No entries found." + (" (topic filter too narrow?)" if args.topic else ""))
return
for p in pairs:
a = p["entry_a"]
b = p["entry_b"]
print(f"Score: {p['score']:.4f}")
print(f" [{a['id'][:8]}] {a['title']}")
print(f" Topics: {', '.join(a['topics']) if a['topics'] else '(none)'}")
print(f" [{b['id'][:8]}] {b['title']}")
print(f" Topics: {', '.join(b['topics']) if b['topics'] else '(none)'}")
for r in results:
print(f"[{r['entry_id'][:8]}] {r['title']}")
print(f" Topics: {', '.join(r['topics'])} | Vitality: {r['vitality']} | Age: {r['age_days']}d")
print(f" {r['content_preview']}")
print()
@@ -420,10 +412,6 @@ def main():
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
id_ = sub.add_parser("ingest-dir", help="Ingest a directory of files")
id_.add_argument("path", help="Directory to ingest")
id_.add_argument("--ext", default="", help="Comma-separated extensions (default: md,txt,json)")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
l.add_argument("-d", "--depth", type=int, default=1)
@@ -494,12 +482,12 @@ def main():
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
rs = sub.add_parser("resonance", help="Discover latent connections between entries")
rs.add_argument("-t", "--threshold", type=float, default=0.3, help="Minimum similarity score (default: 0.3)")
rs.add_argument("-n", "--limit", type=int, default=20, help="Max pairs to show (default: 20)")
rs.add_argument("--topic", default="", help="Restrict to entries with this topic")
dc = sub.add_parser("discover", help="Serendipitous entry discovery")
dc.add_argument("-n", "--count", type=int, default=5, help="Number of entries to discover")
dc.add_argument("-t", "--topic", default=None, help="Filter by topic")
dc.add_argument("--vibrant", action="store_true", help="Prefer vibrant (alive) entries over fading ones")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
sn_create.add_argument("--label", default="", help="Human-readable label for the snapshot")
@@ -521,7 +509,6 @@ def main():
"stats": cmd_stats,
"search": cmd_search,
"ingest": cmd_ingest,
"ingest-dir": cmd_ingest_dir,
"link": cmd_link,
"topics": cmd_topics,
"remove": cmd_remove,
@@ -542,8 +529,8 @@ def main():
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"resonance": cmd_resonance,
"snapshot": cmd_snapshot,
"discover": cmd_discover,
}
dispatch[args.command](args)

View File

@@ -1,135 +1,15 @@
"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, manual entries, and files.
Supports ingesting from MemPalace, raw events, and manual entries.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Optional, Union
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
_DEFAULT_EXTENSIONS = [".md", ".txt", ".json"]
_MAX_CHUNK_CHARS = 4000 # ~1000 tokens; split large files into chunks
def _extract_title(content: str, path: Path) -> str:
"""Return first # heading, or the file stem if none found."""
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith("# "):
return stripped[2:].strip()
return path.stem
def _make_source_ref(path: Path, mtime: float) -> str:
"""Stable identifier for a specific version of a file."""
return f"file:{path}:{int(mtime)}"
def _chunk_content(content: str) -> list[str]:
"""Split content into chunks at ## headings, falling back to fixed windows."""
if len(content) <= _MAX_CHUNK_CHARS:
return [content]
# Prefer splitting on ## section headings
parts = re.split(r"\n(?=## )", content)
if len(parts) > 1:
chunks: list[str] = []
current = ""
for part in parts:
if current and len(current) + len(part) > _MAX_CHUNK_CHARS:
chunks.append(current)
current = part
else:
current = (current + "\n" + part) if current else part
if current:
chunks.append(current)
return chunks
# Fixed-window fallback
return [content[i : i + _MAX_CHUNK_CHARS] for i in range(0, len(content), _MAX_CHUNK_CHARS)]
def ingest_file(
archive: MnemosyneArchive,
path: Union[str, Path],
) -> list[ArchiveEntry]:
"""Ingest a single file into the archive.
- Title is taken from the first ``# heading`` or the filename stem.
- Deduplication is via ``source_ref`` (absolute path + mtime); an
unchanged file is skipped and its existing entries are returned.
- Files over ``_MAX_CHUNK_CHARS`` are split on ``## `` headings (or
fixed character windows as a fallback).
Returns a list of ArchiveEntry objects (one per chunk).
"""
path = Path(path).resolve()
mtime = path.stat().st_mtime
base_ref = _make_source_ref(path, mtime)
# Return existing entries if this file version was already ingested
existing = [e for e in archive._entries.values() if e.source_ref and e.source_ref.startswith(base_ref)]
if existing:
return existing
content = path.read_text(encoding="utf-8", errors="replace")
title = _extract_title(content, path)
chunks = _chunk_content(content)
entries: list[ArchiveEntry] = []
for i, chunk in enumerate(chunks):
chunk_ref = base_ref if len(chunks) == 1 else f"{base_ref}:chunk{i}"
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source="file",
source_ref=chunk_ref,
metadata={
"file_path": str(path),
"chunk": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path: Union[str, Path],
extensions: Optional[list[str]] = None,
) -> int:
"""Walk a directory tree and ingest all matching files.
``extensions`` defaults to ``[".md", ".txt", ".json"]``.
Values may be given with or without a leading dot.
Returns the count of new archive entries created.
"""
dir_path = Path(dir_path).resolve()
if extensions is None:
exts = _DEFAULT_EXTENSIONS
else:
exts = [e if e.startswith(".") else f".{e}" for e in extensions]
added = 0
for file_path in sorted(dir_path.rglob("*")):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
before = archive.count
ingest_file(archive, file_path)
added += archive.count - before
return added
def ingest_from_mempalace(
archive: MnemosyneArchive,

View File

@@ -1,14 +0,0 @@
class Reasoner:
def __init__(self, rules):
self.rules = rules
def evaluate(self, entries):
return [r['action'] for r in self.rules if self._check(r['condition'], entries)]
def _check(self, cond, entries):
if cond.startswith('count'):
# e.g. count(type=anomaly)>3
p = cond.replace('count(', '').split(')')
key, val = p[0].split('=')
count = sum(1 for e in entries if e.get(key) == val)
return eval(f"{count}{p[1]}")
return False

View File

@@ -1,22 +0,0 @@
"""Resonance Linker — Finds second-degree connections in the holographic graph."""
class ResonanceLinker:
def __init__(self, archive):
self.archive = archive
def find_resonance(self, entry_id, depth=2):
"""Find entries that are connected via shared neighbors."""
if entry_id not in self.archive._entries: return []
entry = self.archive._entries[entry_id]
neighbors = set(entry.links)
resonance = {}
for neighbor_id in neighbors:
if neighbor_id in self.archive._entries:
for second_neighbor in self.archive._entries[neighbor_id].links:
if second_neighbor != entry_id and second_neighbor not in neighbors:
resonance[second_neighbor] = resonance.get(second_neighbor, 0) + 1
return sorted(resonance.items(), key=lambda x: x[1], reverse=True)

View File

@@ -1,6 +0,0 @@
[
{
"condition": "count(type=anomaly)>3",
"action": "alert"
}
]

View File

@@ -1,2 +0,0 @@
import json
# Snapshot logic

View File

@@ -1 +1,85 @@
# Test discover
"""Tests for Mnemosyne discover functionality."""
import tempfile
from pathlib import Path
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _make_archive_with_entries():
"""Helper: create an archive with test entries."""
path = Path(tempfile.mkdtemp()) / "test_archive.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
ingest_event(archive, title="Python automation", content="Building tools in Python", topics=["python", "automation"])
ingest_event(archive, title="Cooking pasta", content="How to make carbonara", topics=["cooking"])
ingest_event(archive, title="Bitcoin basics", content="Understanding Bitcoin and blockchain", topics=["bitcoin", "crypto"])
ingest_event(archive, title="AI agents", content="Building autonomous AI agents", topics=["ai", "agents"])
ingest_event(archive, title="Meditation guide", content="Mindfulness and meditation techniques", topics=["wellness"])
return archive
def test_discover_returns_entries():
archive = _make_archive_with_entries()
results = archive.discover(count=3)
assert len(results) == 3
for r in results:
assert "entry_id" in r
assert "title" in r
assert "content_preview" in r
assert "topics" in r
assert "vitality" in r
assert "age_days" in r
def test_discover_respects_count():
archive = _make_archive_with_entries()
results = archive.discover(count=2)
assert len(results) == 2
def test_discover_count_exceeds_entries():
archive = _make_archive_with_entries()
results = archive.discover(count=100)
assert len(results) == archive.count
def test_discover_topic_filter():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="python")
assert len(results) == 1
assert results[0]["title"] == "Python automation"
def test_discover_topic_case_insensitive():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="Python")
assert len(results) == 1
def test_discover_empty_topic_returns_nothing():
archive = _make_archive_with_entries()
results = archive.discover(count=10, topic="nonexistent")
assert len(results) == 0
def test_discover_boosts_vitality():
archive = _make_archive_with_entries()
# Get initial vitality
before = archive.fading(limit=5)
# Discover (which touches entries)
archive.discover(count=3)
# The touched entries should have higher vitality now
after = archive.fading(limit=5)
# At least some entries should have changed vitality
before_vitals = {e["entry_id"]: e["vitality"] for e in before}
after_vitals = {e["entry_id"]: e["vitality"] for e in after}
changed = sum(1 for eid in before_vitals if eid in after_vitals and abs(before_vitals[eid] - after_vitals[eid]) > 0.001)
assert changed >= 1, "Discover should touch and boost vitality of selected entries"
def test_discover_empty_archive():
path = Path(tempfile.mkdtemp()) / "empty.json"
archive = MnemosyneArchive(archive_path=path, auto_embed=False)
results = archive.discover(count=5)
assert len(results) == 0

View File

@@ -1,241 +0,0 @@
"""Tests for file-based ingestion pipeline (ingest_file / ingest_directory)."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import (
_DEFAULT_EXTENSIONS,
_MAX_CHUNK_CHARS,
_chunk_content,
_extract_title,
_make_source_ref,
ingest_directory,
ingest_file,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_archive(tmp_path: Path) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=tmp_path / "archive.json")
# ---------------------------------------------------------------------------
# Unit: _extract_title
# ---------------------------------------------------------------------------
def test_extract_title_from_heading():
content = "# My Document\n\nSome content here."
assert _extract_title(content, Path("ignored.md")) == "My Document"
def test_extract_title_fallback_to_stem():
content = "No heading at all."
assert _extract_title(content, Path("/docs/my_notes.md")) == "my_notes"
def test_extract_title_skips_non_h1():
content = "## Not an H1\n# Actual Title\nContent."
assert _extract_title(content, Path("x.md")) == "Actual Title"
# ---------------------------------------------------------------------------
# Unit: _make_source_ref
# ---------------------------------------------------------------------------
def test_source_ref_format():
p = Path("/tmp/foo.md")
ref = _make_source_ref(p, 1234567890.9)
assert ref == "file:/tmp/foo.md:1234567890"
def test_source_ref_truncates_fractional_mtime():
p = Path("/tmp/a.txt")
assert _make_source_ref(p, 100.99) == _make_source_ref(p, 100.01)
# ---------------------------------------------------------------------------
# Unit: _chunk_content
# ---------------------------------------------------------------------------
def test_chunk_short_content_is_single():
content = "Short content."
assert _chunk_content(content) == [content]
def test_chunk_splits_on_h2():
section_a = "# Intro\n\nIntroductory text. " + "x" * 100
section_b = "## Section B\n\nBody of section B. " + "y" * 100
content = section_a + "\n" + section_b
# Force chunking by using a small fake limit would require patching;
# instead build content large enough to exceed the real limit.
big_a = "# Intro\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
big_b = "## Section B\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
combined = big_a + "\n" + big_b
chunks = _chunk_content(combined)
assert len(chunks) >= 2
assert any("Section B" in c for c in chunks)
def test_chunk_fixed_window_fallback():
# Content with no ## headings but > MAX_CHUNK_CHARS
content = "word " * (_MAX_CHUNK_CHARS // 5 + 100)
chunks = _chunk_content(content)
assert len(chunks) >= 2
for c in chunks:
assert len(c) <= _MAX_CHUNK_CHARS
# ---------------------------------------------------------------------------
# ingest_file
# ---------------------------------------------------------------------------
def test_ingest_file_returns_entry(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "notes.md"
doc.write_text("# My Notes\n\nHello world.")
entries = ingest_file(archive, doc)
assert len(entries) == 1
assert entries[0].title == "My Notes"
assert entries[0].source == "file"
assert "Hello world" in entries[0].content
def test_ingest_file_uses_stem_when_no_heading(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "raw_log.txt"
doc.write_text("Just some plain text without a heading.")
entries = ingest_file(archive, doc)
assert entries[0].title == "raw_log"
def test_ingest_file_dedup_unchanged(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "doc.md"
doc.write_text("# Title\n\nContent.")
entries1 = ingest_file(archive, doc)
assert archive.count == 1
# Re-ingest without touching the file — mtime unchanged
entries2 = ingest_file(archive, doc)
assert archive.count == 1 # no duplicate
assert entries2[0].id == entries1[0].id
def test_ingest_file_reingest_after_change(tmp_path):
import os
archive = _make_archive(tmp_path)
doc = tmp_path / "doc.md"
doc.write_text("# Title\n\nOriginal content.")
ingest_file(archive, doc)
assert archive.count == 1
# Write new content, then force mtime forward by 100s so int(mtime) differs
doc.write_text("# Title\n\nUpdated content.")
new_mtime = doc.stat().st_mtime + 100
os.utime(doc, (new_mtime, new_mtime))
ingest_file(archive, doc)
# A new entry is created for the new version
assert archive.count == 2
def test_ingest_file_source_ref_contains_path(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "thing.txt"
doc.write_text("Plain text.")
entries = ingest_file(archive, doc)
assert str(doc) in entries[0].source_ref
def test_ingest_file_large_produces_chunks(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "big.md"
# Build content with clear ## sections large enough to trigger chunking
big_a = "# Doc\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
big_b = "## Part Two\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
doc.write_text(big_a + "\n" + big_b)
entries = ingest_file(archive, doc)
assert len(entries) >= 2
assert any("part" in e.title.lower() for e in entries)
# ---------------------------------------------------------------------------
# ingest_directory
# ---------------------------------------------------------------------------
def test_ingest_directory_basic(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "a.md").write_text("# Alpha\n\nFirst doc.")
(docs / "b.txt").write_text("Beta plain text.")
(docs / "skip.py").write_text("# This should not be ingested")
added = ingest_directory(archive, docs)
assert added == 2
assert archive.count == 2
def test_ingest_directory_custom_extensions(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "a.md").write_text("# Alpha")
(docs / "b.py").write_text("No heading — uses stem.")
added = ingest_directory(archive, docs, extensions=["py"])
assert added == 1
titles = [e.title for e in archive._entries.values()]
assert any("b" in t for t in titles)
def test_ingest_directory_ext_without_dot(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "notes.md").write_text("# Notes\n\nContent.")
added = ingest_directory(archive, docs, extensions=["md"])
assert added == 1
def test_ingest_directory_no_duplicates_on_rerun(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "file.md").write_text("# Stable\n\nSame content.")
ingest_directory(archive, docs)
assert archive.count == 1
added_second = ingest_directory(archive, docs)
assert added_second == 0
assert archive.count == 1
def test_ingest_directory_recurses_subdirs(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
sub = docs / "sub"
sub.mkdir(parents=True)
(docs / "top.md").write_text("# Top level")
(sub / "nested.md").write_text("# Nested")
added = ingest_directory(archive, docs)
assert added == 2
def test_ingest_directory_default_extensions(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "a.md").write_text("markdown")
(docs / "b.txt").write_text("text")
(docs / "c.json").write_text('{"key": "value"}')
(docs / "d.yaml").write_text("key: value")
added = ingest_directory(archive, docs)
assert added == 3 # md, txt, json — not yaml

View File

@@ -1 +0,0 @@
# Test resonance

View File

@@ -1 +0,0 @@
# Test snapshot

View File

@@ -1,5 +1,27 @@
#!/bin/bash
echo "Running GOFAI guardrails..."
# Syntax checks
find . -name "*.js" -exec node --check {} +
echo "Guardrails passed."
# [Mnemosyne] Agent Guardrails — The Nexus
# Validates code integrity and scans for secrets before deployment.
echo "--- [Mnemosyne] Running Guardrails ---"
# 1. Syntax Checks
echo "[1/3] Validating syntax..."
for f in ; do
node --check "$f" || { echo "Syntax error in $f"; exit 1; }
done
echo "Syntax OK."
# 2. JSON/YAML Validation
echo "[2/3] Validating configs..."
for f in ; do
node -e "JSON.parse(require('fs').readFileSync('$f'))" || { echo "Invalid JSON: $f"; exit 1; }
done
echo "Configs OK."
# 3. Secret Scan
echo "[3/3] Scanning for secrets..."
grep -rE "AI_|TOKEN|KEY|SECRET" . --exclude-dir=node_modules --exclude=guardrails.sh | grep -v "process.env" && {
echo "WARNING: Potential secrets found!"
} || echo "No secrets detected."
echo "--- Guardrails Passed ---"

View File

@@ -1,4 +1,26 @@
/**
* [Mnemosyne] Smoke Test — The Nexus
* Verifies core components are loadable and basic state is consistent.
*/
import MemoryOptimizer from '../nexus/components/memory-optimizer.js';
const optimizer = new MemoryOptimizer();
console.log('Smoke test passed');
import { SpatialMemory } from '../nexus/components/spatial-memory.js';
import { MemoryOptimizer } from '../nexus/components/memory-optimizer.js';
console.log('--- [Mnemosyne] Running Smoke Test ---');
// 1. Verify Components
if (!SpatialMemory || !MemoryOptimizer) {
console.error('Failed to load core components');
process.exit(1);
}
console.log('Components loaded.');
// 2. Verify Regions
const regions = Object.keys(SpatialMemory.REGIONS || {});
if (regions.length < 5) {
console.error('SpatialMemory regions incomplete:', regions);
process.exit(1);
}
console.log('Regions verified:', regions.join(', '));
console.log('--- Smoke Test Passed ---');