Compare commits

...

5 Commits

Author SHA1 Message Date
98cdc34a36 feat: add ingest-dir CLI command (#1275)
mnemosyne ingest-dir <path> [--ext md,txt] [--topics topic1,topic2]
2026-04-12 11:51:56 +00:00
63ac52dc24 feat: export ingest_file and ingest_directory 2026-04-12 11:47:55 +00:00
25f6ffc050 feat: add file and directory ingestion pipeline (#1275)
- ingest_file() reads a single file, extracts title from headings, chunks large files
- ingest_directory() walks directory tree, ingests matching files
- Dedup via source_ref (file path + mtime)
- Chunking at heading and paragraph boundaries for large files
2026-04-12 11:47:20 +00:00
0f87258a1e test: verify PUT API works 2026-04-12 11:46:20 +00:00
72d9c1a303 [claude] Mnemosyne Memory Resonance — latent connection discovery (#1272) (#1274)
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 4s
2026-04-12 11:18:54 +00:00
5 changed files with 444 additions and 4 deletions

View File

@@ -13,7 +13,7 @@ from __future__ import annotations
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event, ingest_file, ingest_directory
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
@@ -27,6 +27,8 @@ __all__ = [
"HolographicLinker",
"ingest_from_mempalace",
"ingest_event",
"ingest_file",
"ingest_directory",
"EmbeddingBackend",
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",

View File

@@ -1274,6 +1274,72 @@ class MnemosyneArchive:
"unchanged": unchanged,
}
def resonance(
self,
threshold: float = 0.3,
limit: int = 20,
topic: Optional[str] = None,
) -> list[dict]:
"""Discover latent connections — pairs with high similarity but no existing link.
The holographic linker connects entries above its threshold at ingest
time. ``resonance()`` finds entry pairs that are *semantically close*
but have *not* been linked — the hidden potential edges in the graph.
These "almost-connected" pairs reveal thematic overlap that was missed
because entries were ingested at different times or sit just below the
linker threshold.
Args:
threshold: Minimum similarity score to surface a pair (default 0.3).
Pairs already linked are excluded regardless of score.
limit: Maximum number of pairs to return (default 20).
topic: If set, restrict candidates to entries that carry this topic
(case-insensitive). Both entries in a pair must match.
Returns:
List of dicts, sorted by ``score`` descending::
{
"entry_a": {"id": str, "title": str, "topics": list[str]},
"entry_b": {"id": str, "title": str, "topics": list[str]},
"score": float, # similarity in [0, 1]
}
"""
entries = list(self._entries.values())
if topic:
topic_lower = topic.lower()
entries = [e for e in entries if topic_lower in [t.lower() for t in e.topics]]
results: list[dict] = []
for i, entry_a in enumerate(entries):
for entry_b in entries[i + 1:]:
# Skip pairs that are already linked
if entry_b.id in entry_a.links or entry_a.id in entry_b.links:
continue
score = self.linker.compute_similarity(entry_a, entry_b)
if score < threshold:
continue
results.append({
"entry_a": {
"id": entry_a.id,
"title": entry_a.title,
"topics": entry_a.topics,
},
"entry_b": {
"id": entry_b.id,
"title": entry_b.title,
"topics": entry_b.topics,
},
"score": round(score, 4),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def rebuild_links(self, threshold: Optional[float] = None) -> int:
"""Recompute all links from scratch.

View File

@@ -7,7 +7,9 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne timeline, mnemosyne neighbors, mnemosyne path,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff
mnemosyne snapshot create|list|restore|diff,
mnemosyne resonance,
mnemosyne ingest-dir
"""
from __future__ import annotations
@@ -18,7 +20,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
from nexus.mnemosyne.ingest import ingest_event, ingest_file, ingest_directory
def cmd_stats(args):
@@ -64,6 +66,21 @@ def cmd_ingest(args):
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_ingest_dir(args):
archive = MnemosyneArchive()
exts = set(args.extensions.split(",")) if args.extensions else None
stats = ingest_directory(
archive,
dir_path=args.path,
extensions=exts,
topics=args.topics.split(",") if args.topics else [],
)
print(f"Scanned: {stats['files_scanned']} files")
print(f"Ingested: {stats['files_ingested']} files -> {stats['entries_added']} entries")
print(f"Skipped: {stats['skipped']} files")
def cmd_link(args):
archive = MnemosyneArchive()
entry = archive.get(args.entry_id)
@@ -366,6 +383,24 @@ def cmd_snapshot(args):
sys.exit(1)
def cmd_resonance(args):
archive = MnemosyneArchive()
topic = args.topic if args.topic else None
pairs = archive.resonance(threshold=args.threshold, limit=args.limit, topic=topic)
if not pairs:
print("No resonant pairs found.")
return
for p in pairs:
a = p["entry_a"]
b = p["entry_b"]
print(f"Score: {p['score']:.4f}")
print(f" [{a['id'][:8]}] {a['title']}")
print(f" Topics: {', '.join(a['topics']) if a['topics'] else '(none)'}")
print(f" [{b['id'][:8]}] {b['title']}")
print(f" Topics: {', '.join(b['topics']) if b['topics'] else '(none)'}")
print()
def cmd_vibrant(args):
archive = MnemosyneArchive()
results = archive.vibrant(limit=args.limit)
@@ -394,6 +429,12 @@ def main():
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
id = sub.add_parser("ingest-dir", help="Ingest all files from a directory")
id.add_argument("path", help="Directory path to ingest")
id.add_argument("--ext", dest="extensions", default="", help="Comma-separated extensions (default: .md,.txt)")
id.add_argument("--topics", default="", help="Comma-separated topics to tag all entries")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
l.add_argument("-d", "--depth", type=int, default=1)
@@ -464,6 +505,11 @@ def main():
vb = sub.add_parser("vibrant", help="Show most alive entries (highest vitality)")
vb.add_argument("-n", "--limit", type=int, default=10, help="Max entries to show")
rs = sub.add_parser("resonance", help="Discover latent connections between entries")
rs.add_argument("-t", "--threshold", type=float, default=0.3, help="Minimum similarity score (default: 0.3)")
rs.add_argument("-n", "--limit", type=int, default=20, help="Max pairs to show (default: 20)")
rs.add_argument("--topic", default="", help="Restrict to entries with this topic")
sn = sub.add_parser("snapshot", help="Point-in-time backup and restore")
sn_sub = sn.add_subparsers(dest="snapshot_cmd")
sn_create = sn_sub.add_parser("create", help="Create a new snapshot")
@@ -506,7 +552,9 @@ def main():
"vitality": cmd_vitality,
"fading": cmd_fading,
"vibrant": cmd_vibrant,
"resonance": cmd_resonance,
"snapshot": cmd_snapshot,
"ingest-dir": cmd_ingest_dir,
}
dispatch[args.command](args)

View File

@@ -1,15 +1,25 @@
"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, and manual entries.
Supports ingesting from MemPalace, raw events, files, and directories.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
# Default max chunk size in characters (roughly ~2000 tokens)
_DEFAULT_CHUNK_SIZE = 8000
# File extensions recognized for ingestion
_TEXT_EXTENSIONS = {".md", ".txt", ".rst", ".log", ".py", ".js", ".yaml", ".yml", ".json", ".toml", ".cfg", ".ini"}
_DEFAULT_EXTENSIONS = {".md", ".txt"}
def ingest_from_mempalace(
archive: MnemosyneArchive,
@@ -60,3 +70,179 @@ def ingest_event(
metadata=metadata or {},
)
return archive.add(entry)
def _extract_title(content: str, fallback: str = "Untitled") -> str:
"""Extract title from first markdown heading, or use fallback."""
for line in content.split("\n")[:10]:
line = line.strip()
m = re.match(r"^#{1,6}\s+(.+)$", line)
if m:
return m.group(1).strip()
for line in content.split("\n")[:5]:
line = line.strip()
if line and len(line) > 3:
return line[:120]
return fallback
def _chunk_content(content: str, max_size: int = _DEFAULT_CHUNK_SIZE) -> list[str]:
"""Split content into chunks at heading boundaries.
Splits on ## headings when content exceeds max_size.
Falls back to paragraph boundaries, then fixed-size splits.
"""
if len(content) <= max_size:
return [content]
chunks: list[str] = []
parts = re.split(r"(\n## )", content)
current = ""
for part in parts:
if len(current) + len(part) > max_size and current:
chunks.append(current.strip())
current = part
else:
current += part
if current.strip():
chunks.append(current.strip())
# If a single chunk is still too large, split on paragraphs
final_chunks: list[str] = []
for chunk in chunks:
if len(chunk) <= max_size:
final_chunks.append(chunk)
else:
paragraphs = chunk.split("\n\n")
para_current = ""
for para in paragraphs:
if len(para_current) + len(para) + 2 > max_size and para_current:
final_chunks.append(para_current.strip())
para_current = para
else:
para_current = para_current + "\n\n" + para if para_current else para
if para_current.strip():
final_chunks.append(para_current.strip())
return final_chunks if final_chunks else [content[:max_size]]
def ingest_file(
archive: MnemosyneArchive,
file_path,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> list:
"""Ingest a single file into the archive.
Extracts title from first markdown heading (or filename).
Large files are chunked at heading boundaries.
Re-ingesting the same unchanged file returns existing entries (dedup via source_ref).
Args:
archive: The MnemosyneArchive to ingest into.
file_path: Path to the file.
source: Source label (default "file").
topics: Topic tags to attach to entries.
max_chunk_size: Maximum characters per chunk before splitting.
Returns:
List of ArchiveEntry objects created (or existing if deduped).
Raises:
FileNotFoundError: If file_path does not exist.
UnicodeDecodeError: If file cannot be decoded as UTF-8.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
stat = path.stat()
source_ref = f"{path.resolve()}:{int(stat.st_mtime)}"
# Check if already ingested (same path + mtime)
existing = [e for e in archive._entries.values() if e.source_ref == source_ref]
if existing:
return existing
content = path.read_text(encoding="utf-8")
if not content.strip():
return []
title = _extract_title(content, fallback=path.stem)
chunks = _chunk_content(content, max_chunk_size)
entries: list = []
for i, chunk in enumerate(chunks):
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1}/{len(chunks)})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source=source,
source_ref=source_ref if len(chunks) == 1 else f"{source_ref}#chunk{i}",
topics=topics or [],
metadata={
"file_path": str(path.resolve()),
"file_name": path.name,
"file_size": stat.st_size,
"file_mtime": stat.st_mtime,
"chunk_index": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path,
extensions: Optional[set[str]] = None,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
recursive: bool = True,
) -> dict:
"""Ingest all matching files from a directory tree.
Args:
archive: The MnemosyneArchive to ingest into.
dir_path: Root directory to scan.
extensions: File extensions to include (default: .md, .txt).
source: Source label for ingested entries.
topics: Topic tags to attach to all entries.
max_chunk_size: Maximum characters per chunk before splitting.
recursive: Whether to recurse into subdirectories.
Returns:
Dict with keys: files_scanned, files_ingested, entries_added, skipped
"""
root = Path(dir_path)
if not root.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
exts = extensions or _DEFAULT_EXTENSIONS
stats = {"files_scanned": 0, "files_ingested": 0, "entries_added": 0, "skipped": 0}
pattern = "**/*" if recursive else "*"
for file_path in sorted(root.glob(pattern)):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
stats["files_scanned"] += 1
try:
entries = ingest_file(archive, file_path, source=source, topics=topics, max_chunk_size=max_chunk_size)
if entries:
stats["files_ingested"] += 1
stats["entries_added"] += len(entries)
else:
stats["skipped"] += 1
except (UnicodeDecodeError, OSError):
stats["skipped"] += 1
return stats

View File

@@ -0,0 +1,138 @@
"""Tests for MnemosyneArchive.resonance() — latent connection discovery."""
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import ingest_event
def _archive(tmp_path: Path) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=tmp_path / "archive.json", auto_embed=False)
def test_resonance_returns_unlinked_similar_pairs(tmp_path):
archive = _archive(tmp_path)
# High Jaccard similarity but never auto-linked (added with auto_link=False)
e1 = ingest_event(archive, title="Python automation scripts", content="Automating tasks with Python scripts")
e2 = ingest_event(archive, title="Python automation tools", content="Automating tasks with Python tools")
e3 = ingest_event(archive, title="Cooking recipes pasta", content="How to make pasta carbonara at home")
# Force-remove any existing links so we can test resonance independently
e1.links = []
e2.links = []
e3.links = []
archive._save()
pairs = archive.resonance(threshold=0.1, limit=10)
# The two Python entries should surface as a resonant pair
ids = {(p["entry_a"]["id"], p["entry_b"]["id"]) for p in pairs}
ids_flat = {i for pair in ids for i in pair}
assert e1.id in ids_flat and e2.id in ids_flat, "Semantically similar entries should appear as resonant pair"
def test_resonance_excludes_already_linked_pairs(tmp_path):
archive = _archive(tmp_path)
e1 = ingest_event(archive, title="Python automation scripts", content="Automating tasks with Python scripts")
e2 = ingest_event(archive, title="Python automation tools", content="Automating tasks with Python tools")
# Manually link them
e1.links = [e2.id]
e2.links = [e1.id]
archive._save()
pairs = archive.resonance(threshold=0.0, limit=100)
for p in pairs:
a_id = p["entry_a"]["id"]
b_id = p["entry_b"]["id"]
assert not (a_id == e1.id and b_id == e2.id), "Already-linked pair should be excluded"
assert not (a_id == e2.id and b_id == e1.id), "Already-linked pair should be excluded"
def test_resonance_sorted_by_score_descending(tmp_path):
archive = _archive(tmp_path)
ingest_event(archive, title="Python coding automation", content="Automating Python coding workflows")
ingest_event(archive, title="Python scripts automation", content="Automation via Python scripting")
ingest_event(archive, title="Cooking food at home", content="Home cooking and food preparation")
# Clear all links to test resonance
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=10)
scores = [p["score"] for p in pairs]
assert scores == sorted(scores, reverse=True), "Pairs must be sorted by score descending"
def test_resonance_limit_respected(tmp_path):
archive = _archive(tmp_path)
for i in range(10):
ingest_event(archive, title=f"Python entry {i}", content=f"Python automation entry number {i}")
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=3)
assert len(pairs) <= 3
def test_resonance_topic_filter(tmp_path):
archive = _archive(tmp_path)
e1 = ingest_event(archive, title="Python tools", content="Python automation tooling", topics=["python"])
e2 = ingest_event(archive, title="Python scripts", content="Python automation scripting", topics=["python"])
e3 = ingest_event(archive, title="Cooking pasta", content="Pasta carbonara recipe cooking", topics=["cooking"])
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=20, topic="python")
for p in pairs:
a_topics = [t.lower() for t in p["entry_a"]["topics"]]
b_topics = [t.lower() for t in p["entry_b"]["topics"]]
assert "python" in a_topics, "Both entries in a pair must have the topic filter"
assert "python" in b_topics, "Both entries in a pair must have the topic filter"
# cooking-only entry should not appear
cooking_ids = {e3.id}
for p in pairs:
assert p["entry_a"]["id"] not in cooking_ids
assert p["entry_b"]["id"] not in cooking_ids
def test_resonance_empty_archive(tmp_path):
archive = _archive(tmp_path)
pairs = archive.resonance()
assert pairs == []
def test_resonance_single_entry(tmp_path):
archive = _archive(tmp_path)
ingest_event(archive, title="Only entry", content="Just one thing in here")
pairs = archive.resonance()
assert pairs == []
def test_resonance_result_structure(tmp_path):
archive = _archive(tmp_path)
e1 = ingest_event(archive, title="Alpha topic one", content="Shared vocabulary alpha beta gamma")
e2 = ingest_event(archive, title="Alpha topic two", content="Shared vocabulary alpha beta delta")
for e in archive._entries.values():
e.links = []
archive._save()
pairs = archive.resonance(threshold=0.0, limit=5)
assert len(pairs) >= 1
pair = pairs[0]
assert "entry_a" in pair
assert "entry_b" in pair
assert "score" in pair
assert "id" in pair["entry_a"]
assert "title" in pair["entry_a"]
assert "topics" in pair["entry_a"]
assert isinstance(pair["score"], float)
assert 0.0 <= pair["score"] <= 1.0