Compare commits

...

4 Commits

Author SHA1 Message Date
98cdc34a36 feat: add ingest-dir CLI command (#1275)
mnemosyne ingest-dir <path> [--ext md,txt] [--topics topic1,topic2]
2026-04-12 11:51:56 +00:00
63ac52dc24 feat: export ingest_file and ingest_directory 2026-04-12 11:47:55 +00:00
25f6ffc050 feat: add file and directory ingestion pipeline (#1275)
- ingest_file() reads a single file, extracts title from headings, chunks large files
- ingest_directory() walks directory tree, ingests matching files
- Dedup via source_ref (file path + mtime)
- Chunking at heading and paragraph boundaries for large files
2026-04-12 11:47:20 +00:00
0f87258a1e test: verify PUT API works 2026-04-12 11:46:20 +00:00
3 changed files with 215 additions and 4 deletions

View File

@@ -13,7 +13,7 @@ from __future__ import annotations
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.linker import HolographicLinker
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event
from nexus.mnemosyne.ingest import ingest_from_mempalace, ingest_event, ingest_file, ingest_directory
from nexus.mnemosyne.embeddings import (
EmbeddingBackend,
OllamaEmbeddingBackend,
@@ -27,6 +27,8 @@ __all__ = [
"HolographicLinker",
"ingest_from_mempalace",
"ingest_event",
"ingest_file",
"ingest_directory",
"EmbeddingBackend",
"OllamaEmbeddingBackend",
"TfidfEmbeddingBackend",

View File

@@ -8,7 +8,8 @@ Provides: mnemosyne ingest, mnemosyne search, mnemosyne link, mnemosyne stats,
mnemosyne touch, mnemosyne decay, mnemosyne vitality,
mnemosyne fading, mnemosyne vibrant,
mnemosyne snapshot create|list|restore|diff,
mnemosyne resonance
mnemosyne resonance,
mnemosyne ingest-dir
"""
from __future__ import annotations
@@ -19,7 +20,7 @@ import sys
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
from nexus.mnemosyne.ingest import ingest_event
from nexus.mnemosyne.ingest import ingest_event, ingest_file, ingest_directory
def cmd_stats(args):
@@ -65,6 +66,21 @@ def cmd_ingest(args):
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
def cmd_ingest_dir(args):
archive = MnemosyneArchive()
exts = set(args.extensions.split(",")) if args.extensions else None
stats = ingest_directory(
archive,
dir_path=args.path,
extensions=exts,
topics=args.topics.split(",") if args.topics else [],
)
print(f"Scanned: {stats['files_scanned']} files")
print(f"Ingested: {stats['files_ingested']} files -> {stats['entries_added']} entries")
print(f"Skipped: {stats['skipped']} files")
def cmd_link(args):
archive = MnemosyneArchive()
entry = archive.get(args.entry_id)
@@ -413,6 +429,12 @@ def main():
i.add_argument("--content", required=True)
i.add_argument("--topics", default="", help="Comma-separated topics")
id = sub.add_parser("ingest-dir", help="Ingest all files from a directory")
id.add_argument("path", help="Directory path to ingest")
id.add_argument("--ext", dest="extensions", default="", help="Comma-separated extensions (default: .md,.txt)")
id.add_argument("--topics", default="", help="Comma-separated topics to tag all entries")
l = sub.add_parser("link", help="Show linked entries")
l.add_argument("entry_id", help="Entry ID (or prefix)")
l.add_argument("-d", "--depth", type=int, default=1)
@@ -532,6 +554,7 @@ def main():
"vibrant": cmd_vibrant,
"resonance": cmd_resonance,
"snapshot": cmd_snapshot,
"ingest-dir": cmd_ingest_dir,
}
dispatch[args.command](args)

View File

@@ -1,15 +1,25 @@
"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, and manual entries.
Supports ingesting from MemPalace, raw events, files, and directories.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
# Default max chunk size in characters (roughly ~2000 tokens)
_DEFAULT_CHUNK_SIZE = 8000
# File extensions recognized for ingestion
_TEXT_EXTENSIONS = {".md", ".txt", ".rst", ".log", ".py", ".js", ".yaml", ".yml", ".json", ".toml", ".cfg", ".ini"}
_DEFAULT_EXTENSIONS = {".md", ".txt"}
def ingest_from_mempalace(
archive: MnemosyneArchive,
@@ -60,3 +70,179 @@ def ingest_event(
metadata=metadata or {},
)
return archive.add(entry)
def _extract_title(content: str, fallback: str = "Untitled") -> str:
"""Extract title from first markdown heading, or use fallback."""
for line in content.split("\n")[:10]:
line = line.strip()
m = re.match(r"^#{1,6}\s+(.+)$", line)
if m:
return m.group(1).strip()
for line in content.split("\n")[:5]:
line = line.strip()
if line and len(line) > 3:
return line[:120]
return fallback
def _chunk_content(content: str, max_size: int = _DEFAULT_CHUNK_SIZE) -> list[str]:
"""Split content into chunks at heading boundaries.
Splits on ## headings when content exceeds max_size.
Falls back to paragraph boundaries, then fixed-size splits.
"""
if len(content) <= max_size:
return [content]
chunks: list[str] = []
parts = re.split(r"(\n## )", content)
current = ""
for part in parts:
if len(current) + len(part) > max_size and current:
chunks.append(current.strip())
current = part
else:
current += part
if current.strip():
chunks.append(current.strip())
# If a single chunk is still too large, split on paragraphs
final_chunks: list[str] = []
for chunk in chunks:
if len(chunk) <= max_size:
final_chunks.append(chunk)
else:
paragraphs = chunk.split("\n\n")
para_current = ""
for para in paragraphs:
if len(para_current) + len(para) + 2 > max_size and para_current:
final_chunks.append(para_current.strip())
para_current = para
else:
para_current = para_current + "\n\n" + para if para_current else para
if para_current.strip():
final_chunks.append(para_current.strip())
return final_chunks if final_chunks else [content[:max_size]]
def ingest_file(
archive: MnemosyneArchive,
file_path,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> list:
"""Ingest a single file into the archive.
Extracts title from first markdown heading (or filename).
Large files are chunked at heading boundaries.
Re-ingesting the same unchanged file returns existing entries (dedup via source_ref).
Args:
archive: The MnemosyneArchive to ingest into.
file_path: Path to the file.
source: Source label (default "file").
topics: Topic tags to attach to entries.
max_chunk_size: Maximum characters per chunk before splitting.
Returns:
List of ArchiveEntry objects created (or existing if deduped).
Raises:
FileNotFoundError: If file_path does not exist.
UnicodeDecodeError: If file cannot be decoded as UTF-8.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
stat = path.stat()
source_ref = f"{path.resolve()}:{int(stat.st_mtime)}"
# Check if already ingested (same path + mtime)
existing = [e for e in archive._entries.values() if e.source_ref == source_ref]
if existing:
return existing
content = path.read_text(encoding="utf-8")
if not content.strip():
return []
title = _extract_title(content, fallback=path.stem)
chunks = _chunk_content(content, max_chunk_size)
entries: list = []
for i, chunk in enumerate(chunks):
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1}/{len(chunks)})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source=source,
source_ref=source_ref if len(chunks) == 1 else f"{source_ref}#chunk{i}",
topics=topics or [],
metadata={
"file_path": str(path.resolve()),
"file_name": path.name,
"file_size": stat.st_size,
"file_mtime": stat.st_mtime,
"chunk_index": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path,
extensions: Optional[set[str]] = None,
source: str = "file",
topics: Optional[list[str]] = None,
max_chunk_size: int = _DEFAULT_CHUNK_SIZE,
recursive: bool = True,
) -> dict:
"""Ingest all matching files from a directory tree.
Args:
archive: The MnemosyneArchive to ingest into.
dir_path: Root directory to scan.
extensions: File extensions to include (default: .md, .txt).
source: Source label for ingested entries.
topics: Topic tags to attach to all entries.
max_chunk_size: Maximum characters per chunk before splitting.
recursive: Whether to recurse into subdirectories.
Returns:
Dict with keys: files_scanned, files_ingested, entries_added, skipped
"""
root = Path(dir_path)
if not root.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
exts = extensions or _DEFAULT_EXTENSIONS
stats = {"files_scanned": 0, "files_ingested": 0, "entries_added": 0, "skipped": 0}
pattern = "**/*" if recursive else "*"
for file_path in sorted(root.glob(pattern)):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
stats["files_scanned"] += 1
try:
entries = ingest_file(archive, file_path, source=source, topics=topics, max_chunk_size=max_chunk_size)
if entries:
stats["files_ingested"] += 1
stats["entries_added"] += len(entries)
else:
stats["skipped"] += 1
except (UnicodeDecodeError, OSError):
stats["skipped"] += 1
return stats