Compare commits

..

4 Commits

Author SHA1 Message Date
98cdc34a36 feat: add ingest-dir CLI command (#1275)
mnemosyne ingest-dir <path> [--ext md,txt] [--topics topic1,topic2]
2026-04-12 11:51:56 +00:00
63ac52dc24 feat: export ingest_file and ingest_directory 2026-04-12 11:47:55 +00:00
25f6ffc050 feat: add file and directory ingestion pipeline (#1275)
- ingest_file() reads a single file, extracts title from headings, chunks large files
- ingest_directory() walks directory tree, ingests matching files
- Dedup via source_ref (file path + mtime)
- Chunking at heading and paragraph boundaries for large files
2026-04-12 11:47:20 +00:00
0f87258a1e test: verify PUT API works 2026-04-12 11:46:20 +00:00
5 changed files with 208 additions and 370 deletions

View File

@@ -177,7 +177,7 @@ The rule is:
- rescue good work from legacy Matrix
- rebuild inside `the-nexus`
- keep telemetry and durable truth flowing through the Hermes harness
- Hermes is the sole harness — no external gateway dependencies
- keep OpenClaw as a sidecar, not the authority
## Verified historical browser-world snapshot

View File

@@ -13,7 +13,7 @@ from __future__ import annotations
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event, ingest_file, ingest_directory
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
@@ -27,6 +27,8 @@ __all__ = [
"HolographicLinker",
"ingest_from_mempalace",
"ingest_event",
"ingest_file",
"ingest_directory",
"EmbeddingBackend",
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",

View File

@@ -8,7 +8,8 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff,
mnemosyne resonance
mnemosyne resonance,
mnemosyne ingest-dir
"""
from __future__ import annotations
@@ -19,7 +20,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event, ingest_directory
from nexus.mnemosyne.ingest import ingest_event, ingest_file, ingest_directory
def cmd_stats(args):
@@ -65,11 +66,19 @@ def cmd_ingest(args):
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_ingest_dir(args):
archive = MnemosyneArchive()
ext = [e.strip() for e in args.ext.split(",")] if args.ext else None
added = ingest_directory(archive, args.path, extensions=ext)
print(f"Ingested {added} new entries from {args.path}")
exts = set(args.extensions.split(",")) if args.extensions else None
stats = ingest_directory(
archive,
dir_path=args.path,
extensions=exts,
topics=args.topics.split(",") if args.topics else [],
)
print(f"Scanned: {stats['files_scanned']} files")
print(f"Ingested: {stats['files_ingested']} files -> {stats['entries_added']} entries")
print(f"Skipped: {stats['skipped']} files")
def cmd_link(args):
@@ -420,9 +429,11 @@ def main():
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
id_ = sub.add_parser("ingest-dir", help="Ingest a directory of files")
id_.add_argument("path", help="Directory to ingest")
id_.add_argument("--ext", default="", help="Comma-separated extensions (default: md,txt,json)")
id = sub.add_parser("ingest-dir", help="Ingest all files from a directory")
id.add_argument("path", help="Directory path to ingest")
id.add_argument("--ext", dest="extensions", default="", help="Comma-separated extensions (default: .md,.txt)")
id.add_argument("--topics", default="", help="Comma-separated topics to tag all entries")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
@@ -521,7 +532,6 @@ def main():
"stats": cmd_stats,
"search": cmd_search,
"ingest": cmd_ingest,
"ingest-dir": cmd_ingest_dir,
"link": cmd_link,
"topics": cmd_topics,
"remove": cmd_remove,
@@ -544,6 +554,7 @@ def main():
"vibrant": cmd_vibrant,
"resonance": cmd_resonance,
"snapshot": cmd_snapshot,
"ingest-dir": cmd_ingest_dir,
}
dispatch[args.command](args)

View File

@@ -1,134 +1,24 @@
"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, manual entries, and files.
Supports ingesting from MemPalace, raw events, files, and directories.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Optional, Union
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
_DEFAULT_EXTENSIONS = [".md", ".txt", ".json"]
_MAX_CHUNK_CHARS = 4000 # ~1000 tokens; split large files into chunks
# Default max chunk size in characters (roughly ~2000 tokens)
_DEFAULT_CHUNK_SIZE = 8000
def _extract_title(content: str, path: Path) -> str:
"""Return first # heading, or the file stem if none found."""
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith("# "):
return stripped[2:].strip()
return path.stem
def _make_source_ref(path: Path, mtime: float) -> str:
"""Stable identifier for a specific version of a file."""
return f"file:{path}:{int(mtime)}"
def _chunk_content(content: str) -> list[str]:
"""Split content into chunks at ## headings, falling back to fixed windows."""
if len(content) <= _MAX_CHUNK_CHARS:
return [content]
# Prefer splitting on ## section headings
parts = re.split(r"\n(?=## )", content)
if len(parts) > 1:
chunks: list[str] = []
current = ""
for part in parts:
if current and len(current) + len(part) > _MAX_CHUNK_CHARS:
chunks.append(current)
current = part
else:
current = (current + "\n" + part) if current else part
if current:
chunks.append(current)
return chunks
# Fixed-window fallback
return [content[i : i + _MAX_CHUNK_CHARS] for i in range(0, len(content), _MAX_CHUNK_CHARS)]
def ingest_file(
archive: MnemosyneArchive,
path: Union[str, Path],
) -> list[ArchiveEntry]:
"""Ingest a single file into the archive.
- Title is taken from the first ``# heading`` or the filename stem.
- Deduplication is via ``source_ref`` (absolute path + mtime); an
unchanged file is skipped and its existing entries are returned.
- Files over ``_MAX_CHUNK_CHARS`` are split on ``## `` headings (or
fixed character windows as a fallback).
Returns a list of ArchiveEntry objects (one per chunk).
"""
path = Path(path).resolve()
mtime = path.stat().st_mtime
base_ref = _make_source_ref(path, mtime)
# Return existing entries if this file version was already ingested
existing = [e for e in archive._entries.values() if e.source_ref and e.source_ref.startswith(base_ref)]
if existing:
return existing
content = path.read_text(encoding="utf-8", errors="replace")
title = _extract_title(content, path)
chunks = _chunk_content(content)
entries: list[ArchiveEntry] = []
for i, chunk in enumerate(chunks):
chunk_ref = base_ref if len(chunks) == 1 else f"{base_ref}:chunk{i}"
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source="file",
source_ref=chunk_ref,
metadata={
"file_path": str(path),
"chunk": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path: Union[str, Path],
extensions: Optional[list[str]] = None,
) -> int:
"""Walk a directory tree and ingest all matching files.
``extensions`` defaults to ``[".md", ".txt", ".json"]``.
Values may be given with or without a leading dot.
Returns the count of new archive entries created.
"""
dir_path = Path(dir_path).resolve()
if extensions is None:
exts = _DEFAULT_EXTENSIONS
else:
exts = [e if e.startswith(".") else f".{e}" for e in extensions]
added = 0
for file_path in sorted(dir_path.rglob("*")):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
before = archive.count
ingest_file(archive, file_path)
added += archive.count - before
return added
# File extensions recognized for ingestion
_TEXT_EXTENSIONS = {".md", ".txt", ".rst", ".log", ".py", ".js", ".yaml", ".yml", ".json", ".toml", ".cfg", ".ini"}
_DEFAULT_EXTENSIONS = {".md", ".txt"}
def ingest_from_mempalace(
@@ -180,3 +70,179 @@ def ingest_event(
metadata=metadata or {},
)
return archive.add(entry)
def _extract_title(content: str, fallback: str = "Untitled") -> str:
"""Extract title from first markdown heading, or use fallback."""
for line in content.split("\n")[:10]:
line = line.strip()
m = re.match(r"^#{1,6}\s+(.+)$", line)
if m:
return m.group(1).strip()
for line in content.split("\n")[:5]:
line = line.strip()
if line and len(line) > 3:
return line[:120]
return fallback
def _chunk_content(content: str, max_size: int = _DEFAULT_CHUNK_SIZE) -> list[str]:
"""Split content into chunks at heading boundaries.
Splits on ## headings when content exceeds max_size.
Falls back to paragraph boundaries, then fixed-size splits.
"""
if len(content) <= max_size:
return [content]
chunks: list[str] = []
parts = re.split(r"(\n## )", content)
current = ""
for part in parts:
if len(current) + len(part) > max_size and current:
chunks.append(current.strip())
current = part
else:
current += part
if current.strip():
chunks.append(current.strip())
# If a single chunk is still too large, split on paragraphs
final_chunks: list[str] = []
for chunk in chunks:
if len(chunk) <= max_size:
final_chunks.append(chunk)
else:
paragraphs = chunk.split("\n\n")
para_current = ""
for para in paragraphs:
if len(para_current) + len(para) + 2 > max_size and para_current:
final_chunks.append(para_current.strip())
para_current = para
else:
para_current = para_current + "\n\n" + para if para_current else para
if para_current.strip():
final_chunks.append(para_current.strip())
return final_chunks if final_chunks else [content[:max_size]]
def ingest_file(
archive: MnemosyneArchive,
file_path,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> list:
"""Ingest a single file into the archive.
Extracts title from first markdown heading (or filename).
Large files are chunked at heading boundaries.
Re-ingesting the same unchanged file returns existing entries (dedup via source_ref).
Args:
archive: The MnemosyneArchive to ingest into.
file_path: Path to the file.
source: Source label (default "file").
topics: Topic tags to attach to entries.
max_chunk_size: Maximum characters per chunk before splitting.
Returns:
List of ArchiveEntry objects created (or existing if deduped).
Raises:
FileNotFoundError: If file_path does not exist.
UnicodeDecodeError: If file cannot be decoded as UTF-8.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
stat = path.stat()
source_ref = f"{path.resolve()}:{int(stat.st_mtime)}"
# Check if already ingested (same path + mtime)
existing = [e for e in archive._entries.values() if e.source_ref == source_ref]
if existing:
return existing
content = path.read_text(encoding="utf-8")
if not content.strip():
return []
title = _extract_title(content, fallback=path.stem)
chunks = _chunk_content(content, max_chunk_size)
entries: list = []
for i, chunk in enumerate(chunks):
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1}/{len(chunks)})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source=source,
source_ref=source_ref if len(chunks) == 1 else f"{source_ref}#chunk{i}",
topics=topics or [],
metadata={
"file_path": str(path.resolve()),
"file_name": path.name,
"file_size": stat.st_size,
"file_mtime": stat.st_mtime,
"chunk_index": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path,
extensions: Optional[set[str]] = None,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
recursive: bool = True,
) -> dict:
"""Ingest all matching files from a directory tree.
Args:
archive: The MnemosyneArchive to ingest into.
dir_path: Root directory to scan.
extensions: File extensions to include (default: .md, .txt).
source: Source label for ingested entries.
topics: Topic tags to attach to all entries.
max_chunk_size: Maximum characters per chunk before splitting.
recursive: Whether to recurse into subdirectories.
Returns:
Dict with keys: files_scanned, files_ingested, entries_added, skipped
"""
root = Path(dir_path)
if not root.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
exts = extensions or _DEFAULT_EXTENSIONS
stats = {"files_scanned": 0, "files_ingested": 0, "entries_added": 0, "skipped": 0}
pattern = "**/*" if recursive else "*"
for file_path in sorted(root.glob(pattern)):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
stats["files_scanned"] += 1
try:
entries = ingest_file(archive, file_path, source=source, topics=topics, max_chunk_size=max_chunk_size)
if entries:
stats["files_ingested"] += 1
stats["entries_added"] += len(entries)
else:
stats["skipped"] += 1
except (UnicodeDecodeError, OSError):
stats["skipped"] += 1
return stats

View File

@@ -1,241 +0,0 @@
"""Tests for file-based ingestion pipeline (ingest_file / ingest_directory)."""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.ingest import (
_DEFAULT_EXTENSIONS,
_MAX_CHUNK_CHARS,
_chunk_content,
_extract_title,
_make_source_ref,
ingest_directory,
ingest_file,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_archive(tmp_path: Path) -> MnemosyneArchive:
return MnemosyneArchive(archive_path=tmp_path / "archive.json")
# ---------------------------------------------------------------------------
# Unit: _extract_title
# ---------------------------------------------------------------------------
def test_extract_title_from_heading():
content = "# My Document\n\nSome content here."
assert _extract_title(content, Path("ignored.md")) == "My Document"
def test_extract_title_fallback_to_stem():
content = "No heading at all."
assert _extract_title(content, Path("/docs/my_notes.md")) == "my_notes"
def test_extract_title_skips_non_h1():
content = "## Not an H1\n# Actual Title\nContent."
assert _extract_title(content, Path("x.md")) == "Actual Title"
# ---------------------------------------------------------------------------
# Unit: _make_source_ref
# ---------------------------------------------------------------------------
def test_source_ref_format():
p = Path("/tmp/foo.md")
ref = _make_source_ref(p, 1234567890.9)
assert ref == "file:/tmp/foo.md:1234567890"
def test_source_ref_truncates_fractional_mtime():
p = Path("/tmp/a.txt")
assert _make_source_ref(p, 100.99) == _make_source_ref(p, 100.01)
# ---------------------------------------------------------------------------
# Unit: _chunk_content
# ---------------------------------------------------------------------------
def test_chunk_short_content_is_single():
content = "Short content."
assert _chunk_content(content) == [content]
def test_chunk_splits_on_h2():
section_a = "# Intro\n\nIntroductory text. " + "x" * 100
section_b = "## Section B\n\nBody of section B. " + "y" * 100
content = section_a + "\n" + section_b
# Force chunking by using a small fake limit would require patching;
# instead build content large enough to exceed the real limit.
big_a = "# Intro\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
big_b = "## Section B\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
combined = big_a + "\n" + big_b
chunks = _chunk_content(combined)
assert len(chunks) >= 2
assert any("Section B" in c for c in chunks)
def test_chunk_fixed_window_fallback():
# Content with no ## headings but > MAX_CHUNK_CHARS
content = "word " * (_MAX_CHUNK_CHARS // 5 + 100)
chunks = _chunk_content(content)
assert len(chunks) >= 2
for c in chunks:
assert len(c) <= _MAX_CHUNK_CHARS
# ---------------------------------------------------------------------------
# ingest_file
# ---------------------------------------------------------------------------
def test_ingest_file_returns_entry(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "notes.md"
doc.write_text("# My Notes\n\nHello world.")
entries = ingest_file(archive, doc)
assert len(entries) == 1
assert entries[0].title == "My Notes"
assert entries[0].source == "file"
assert "Hello world" in entries[0].content
def test_ingest_file_uses_stem_when_no_heading(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "raw_log.txt"
doc.write_text("Just some plain text without a heading.")
entries = ingest_file(archive, doc)
assert entries[0].title == "raw_log"
def test_ingest_file_dedup_unchanged(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "doc.md"
doc.write_text("# Title\n\nContent.")
entries1 = ingest_file(archive, doc)
assert archive.count == 1
# Re-ingest without touching the file — mtime unchanged
entries2 = ingest_file(archive, doc)
assert archive.count == 1 # no duplicate
assert entries2[0].id == entries1[0].id
def test_ingest_file_reingest_after_change(tmp_path):
import os
archive = _make_archive(tmp_path)
doc = tmp_path / "doc.md"
doc.write_text("# Title\n\nOriginal content.")
ingest_file(archive, doc)
assert archive.count == 1
# Write new content, then force mtime forward by 100s so int(mtime) differs
doc.write_text("# Title\n\nUpdated content.")
new_mtime = doc.stat().st_mtime + 100
os.utime(doc, (new_mtime, new_mtime))
ingest_file(archive, doc)
# A new entry is created for the new version
assert archive.count == 2
def test_ingest_file_source_ref_contains_path(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "thing.txt"
doc.write_text("Plain text.")
entries = ingest_file(archive, doc)
assert str(doc) in entries[0].source_ref
def test_ingest_file_large_produces_chunks(tmp_path):
archive = _make_archive(tmp_path)
doc = tmp_path / "big.md"
# Build content with clear ## sections large enough to trigger chunking
big_a = "# Doc\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
big_b = "## Part Two\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
doc.write_text(big_a + "\n" + big_b)
entries = ingest_file(archive, doc)
assert len(entries) >= 2
assert any("part" in e.title.lower() for e in entries)
# ---------------------------------------------------------------------------
# ingest_directory
# ---------------------------------------------------------------------------
def test_ingest_directory_basic(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "a.md").write_text("# Alpha\n\nFirst doc.")
(docs / "b.txt").write_text("Beta plain text.")
(docs / "skip.py").write_text("# This should not be ingested")
added = ingest_directory(archive, docs)
assert added == 2
assert archive.count == 2
def test_ingest_directory_custom_extensions(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "a.md").write_text("# Alpha")
(docs / "b.py").write_text("No heading — uses stem.")
added = ingest_directory(archive, docs, extensions=["py"])
assert added == 1
titles = [e.title for e in archive._entries.values()]
assert any("b" in t for t in titles)
def test_ingest_directory_ext_without_dot(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "notes.md").write_text("# Notes\n\nContent.")
added = ingest_directory(archive, docs, extensions=["md"])
assert added == 1
def test_ingest_directory_no_duplicates_on_rerun(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "file.md").write_text("# Stable\n\nSame content.")
ingest_directory(archive, docs)
assert archive.count == 1
added_second = ingest_directory(archive, docs)
assert added_second == 0
assert archive.count == 1
def test_ingest_directory_recurses_subdirs(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
sub = docs / "sub"
sub.mkdir(parents=True)
(docs / "top.md").write_text("# Top level")
(sub / "nested.md").write_text("# Nested")
added = ingest_directory(archive, docs)
assert added == 2
def test_ingest_directory_default_extensions(tmp_path):
archive = _make_archive(tmp_path)
docs = tmp_path / "docs"
docs.mkdir()
(docs / "a.md").write_text("markdown")
(docs / "b.txt").write_text("text")
(docs / "c.json").write_text('{"key": "value"}')
(docs / "d.yaml").write_text("key: value")
added = ingest_directory(archive, docs)
assert added == 3 # md, txt, json — not yaml