feat(mnemosyne): add file-based document ingestion pipeline
Implements ingest_file() and ingest_directory() in ingest.py: - ingest_file(archive, path): reads a single file, extracts title from first # heading (or filename stem), deduplicates via source_ref (absolute path + mtime), and chunks large files on ## headings or fixed character windows. - ingest_directory(archive, dir_path, extensions=None): walks a directory tree and ingests all matching files (default: .md, .txt, .json), returning the count of new entries added. Also adds `mnemosyne ingest-dir <path> [--ext md,txt]` CLI command and 20 unit tests covering dedup, chunking, title extraction, and directory traversal. Fixes #1275 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -19,7 +19,7 @@ import sys
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
from nexus.mnemosyne.ingest import ingest_event
|
||||
from nexus.mnemosyne.ingest import ingest_event, ingest_directory
|
||||
|
||||
|
||||
def cmd_stats(args):
|
||||
@@ -65,6 +65,13 @@ def cmd_ingest(args):
|
||||
print(f"Ingested: [{entry.id[:8]}] {entry.title} ({len(entry.links)} links)")
|
||||
|
||||
|
||||
def cmd_ingest_dir(args):
|
||||
archive = MnemosyneArchive()
|
||||
ext = [e.strip() for e in args.ext.split(",")] if args.ext else None
|
||||
added = ingest_directory(archive, args.path, extensions=ext)
|
||||
print(f"Ingested {added} new entries from {args.path}")
|
||||
|
||||
|
||||
def cmd_link(args):
|
||||
archive = MnemosyneArchive()
|
||||
entry = archive.get(args.entry_id)
|
||||
@@ -413,6 +420,10 @@ def main():
|
||||
i.add_argument("--content", required=True)
|
||||
i.add_argument("--topics", default="", help="Comma-separated topics")
|
||||
|
||||
id_ = sub.add_parser("ingest-dir", help="Ingest a directory of files")
|
||||
id_.add_argument("path", help="Directory to ingest")
|
||||
id_.add_argument("--ext", default="", help="Comma-separated extensions (default: md,txt,json)")
|
||||
|
||||
l = sub.add_parser("link", help="Show linked entries")
|
||||
l.add_argument("entry_id", help="Entry ID (or prefix)")
|
||||
l.add_argument("-d", "--depth", type=int, default=1)
|
||||
@@ -510,6 +521,7 @@ def main():
|
||||
"stats": cmd_stats,
|
||||
"search": cmd_search,
|
||||
"ingest": cmd_ingest,
|
||||
"ingest-dir": cmd_ingest_dir,
|
||||
"link": cmd_link,
|
||||
"topics": cmd_topics,
|
||||
"remove": cmd_remove,
|
||||
|
||||
@@ -1,15 +1,135 @@
|
||||
"""Ingestion pipeline — feeds data into the archive.
|
||||
|
||||
Supports ingesting from MemPalace, raw events, and manual entries.
|
||||
Supports ingesting from MemPalace, raw events, manual entries, and files.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.entry import ArchiveEntry
|
||||
|
||||
_DEFAULT_EXTENSIONS = [".md", ".txt", ".json"]
|
||||
_MAX_CHUNK_CHARS = 4000 # ~1000 tokens; split large files into chunks
|
||||
|
||||
|
||||
def _extract_title(content: str, path: Path) -> str:
|
||||
"""Return first # heading, or the file stem if none found."""
|
||||
for line in content.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("# "):
|
||||
return stripped[2:].strip()
|
||||
return path.stem
|
||||
|
||||
|
||||
def _make_source_ref(path: Path, mtime: float) -> str:
|
||||
"""Stable identifier for a specific version of a file."""
|
||||
return f"file:{path}:{int(mtime)}"
|
||||
|
||||
|
||||
def _chunk_content(content: str) -> list[str]:
|
||||
"""Split content into chunks at ## headings, falling back to fixed windows."""
|
||||
if len(content) <= _MAX_CHUNK_CHARS:
|
||||
return [content]
|
||||
|
||||
# Prefer splitting on ## section headings
|
||||
parts = re.split(r"\n(?=## )", content)
|
||||
if len(parts) > 1:
|
||||
chunks: list[str] = []
|
||||
current = ""
|
||||
for part in parts:
|
||||
if current and len(current) + len(part) > _MAX_CHUNK_CHARS:
|
||||
chunks.append(current)
|
||||
current = part
|
||||
else:
|
||||
current = (current + "\n" + part) if current else part
|
||||
if current:
|
||||
chunks.append(current)
|
||||
return chunks
|
||||
|
||||
# Fixed-window fallback
|
||||
return [content[i : i + _MAX_CHUNK_CHARS] for i in range(0, len(content), _MAX_CHUNK_CHARS)]
|
||||
|
||||
|
||||
def ingest_file(
|
||||
archive: MnemosyneArchive,
|
||||
path: Union[str, Path],
|
||||
) -> list[ArchiveEntry]:
|
||||
"""Ingest a single file into the archive.
|
||||
|
||||
- Title is taken from the first ``# heading`` or the filename stem.
|
||||
- Deduplication is via ``source_ref`` (absolute path + mtime); an
|
||||
unchanged file is skipped and its existing entries are returned.
|
||||
- Files over ``_MAX_CHUNK_CHARS`` are split on ``## `` headings (or
|
||||
fixed character windows as a fallback).
|
||||
|
||||
Returns a list of ArchiveEntry objects (one per chunk).
|
||||
"""
|
||||
path = Path(path).resolve()
|
||||
mtime = path.stat().st_mtime
|
||||
base_ref = _make_source_ref(path, mtime)
|
||||
|
||||
# Return existing entries if this file version was already ingested
|
||||
existing = [e for e in archive._entries.values() if e.source_ref and e.source_ref.startswith(base_ref)]
|
||||
if existing:
|
||||
return existing
|
||||
|
||||
content = path.read_text(encoding="utf-8", errors="replace")
|
||||
title = _extract_title(content, path)
|
||||
chunks = _chunk_content(content)
|
||||
|
||||
entries: list[ArchiveEntry] = []
|
||||
for i, chunk in enumerate(chunks):
|
||||
chunk_ref = base_ref if len(chunks) == 1 else f"{base_ref}:chunk{i}"
|
||||
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1})"
|
||||
entry = ArchiveEntry(
|
||||
title=chunk_title,
|
||||
content=chunk,
|
||||
source="file",
|
||||
source_ref=chunk_ref,
|
||||
metadata={
|
||||
"file_path": str(path),
|
||||
"chunk": i,
|
||||
"total_chunks": len(chunks),
|
||||
},
|
||||
)
|
||||
archive.add(entry)
|
||||
entries.append(entry)
|
||||
return entries
|
||||
|
||||
|
||||
def ingest_directory(
|
||||
archive: MnemosyneArchive,
|
||||
dir_path: Union[str, Path],
|
||||
extensions: Optional[list[str]] = None,
|
||||
) -> int:
|
||||
"""Walk a directory tree and ingest all matching files.
|
||||
|
||||
``extensions`` defaults to ``[".md", ".txt", ".json"]``.
|
||||
Values may be given with or without a leading dot.
|
||||
|
||||
Returns the count of new archive entries created.
|
||||
"""
|
||||
dir_path = Path(dir_path).resolve()
|
||||
if extensions is None:
|
||||
exts = _DEFAULT_EXTENSIONS
|
||||
else:
|
||||
exts = [e if e.startswith(".") else f".{e}" for e in extensions]
|
||||
|
||||
added = 0
|
||||
for file_path in sorted(dir_path.rglob("*")):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
if file_path.suffix.lower() not in exts:
|
||||
continue
|
||||
before = archive.count
|
||||
ingest_file(archive, file_path)
|
||||
added += archive.count - before
|
||||
return added
|
||||
|
||||
|
||||
def ingest_from_mempalace(
|
||||
archive: MnemosyneArchive,
|
||||
|
||||
241
nexus/mnemosyne/tests/test_ingest_file.py
Normal file
241
nexus/mnemosyne/tests/test_ingest_file.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""Tests for file-based ingestion pipeline (ingest_file / ingest_directory)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from nexus.mnemosyne.archive import MnemosyneArchive
|
||||
from nexus.mnemosyne.ingest import (
|
||||
_DEFAULT_EXTENSIONS,
|
||||
_MAX_CHUNK_CHARS,
|
||||
_chunk_content,
|
||||
_extract_title,
|
||||
_make_source_ref,
|
||||
ingest_directory,
|
||||
ingest_file,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_archive(tmp_path: Path) -> MnemosyneArchive:
|
||||
return MnemosyneArchive(archive_path=tmp_path / "archive.json")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit: _extract_title
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_extract_title_from_heading():
|
||||
content = "# My Document\n\nSome content here."
|
||||
assert _extract_title(content, Path("ignored.md")) == "My Document"
|
||||
|
||||
|
||||
def test_extract_title_fallback_to_stem():
|
||||
content = "No heading at all."
|
||||
assert _extract_title(content, Path("/docs/my_notes.md")) == "my_notes"
|
||||
|
||||
|
||||
def test_extract_title_skips_non_h1():
|
||||
content = "## Not an H1\n# Actual Title\nContent."
|
||||
assert _extract_title(content, Path("x.md")) == "Actual Title"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit: _make_source_ref
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_source_ref_format():
|
||||
p = Path("/tmp/foo.md")
|
||||
ref = _make_source_ref(p, 1234567890.9)
|
||||
assert ref == "file:/tmp/foo.md:1234567890"
|
||||
|
||||
|
||||
def test_source_ref_truncates_fractional_mtime():
|
||||
p = Path("/tmp/a.txt")
|
||||
assert _make_source_ref(p, 100.99) == _make_source_ref(p, 100.01)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit: _chunk_content
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_chunk_short_content_is_single():
|
||||
content = "Short content."
|
||||
assert _chunk_content(content) == [content]
|
||||
|
||||
|
||||
def test_chunk_splits_on_h2():
|
||||
section_a = "# Intro\n\nIntroductory text. " + "x" * 100
|
||||
section_b = "## Section B\n\nBody of section B. " + "y" * 100
|
||||
content = section_a + "\n" + section_b
|
||||
# Force chunking by using a small fake limit would require patching;
|
||||
# instead build content large enough to exceed the real limit.
|
||||
big_a = "# Intro\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
|
||||
big_b = "## Section B\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
|
||||
combined = big_a + "\n" + big_b
|
||||
chunks = _chunk_content(combined)
|
||||
assert len(chunks) >= 2
|
||||
assert any("Section B" in c for c in chunks)
|
||||
|
||||
|
||||
def test_chunk_fixed_window_fallback():
|
||||
# Content with no ## headings but > MAX_CHUNK_CHARS
|
||||
content = "word " * (_MAX_CHUNK_CHARS // 5 + 100)
|
||||
chunks = _chunk_content(content)
|
||||
assert len(chunks) >= 2
|
||||
for c in chunks:
|
||||
assert len(c) <= _MAX_CHUNK_CHARS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ingest_file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ingest_file_returns_entry(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
doc = tmp_path / "notes.md"
|
||||
doc.write_text("# My Notes\n\nHello world.")
|
||||
entries = ingest_file(archive, doc)
|
||||
assert len(entries) == 1
|
||||
assert entries[0].title == "My Notes"
|
||||
assert entries[0].source == "file"
|
||||
assert "Hello world" in entries[0].content
|
||||
|
||||
|
||||
def test_ingest_file_uses_stem_when_no_heading(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
doc = tmp_path / "raw_log.txt"
|
||||
doc.write_text("Just some plain text without a heading.")
|
||||
entries = ingest_file(archive, doc)
|
||||
assert entries[0].title == "raw_log"
|
||||
|
||||
|
||||
def test_ingest_file_dedup_unchanged(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
doc = tmp_path / "doc.md"
|
||||
doc.write_text("# Title\n\nContent.")
|
||||
entries1 = ingest_file(archive, doc)
|
||||
assert archive.count == 1
|
||||
|
||||
# Re-ingest without touching the file — mtime unchanged
|
||||
entries2 = ingest_file(archive, doc)
|
||||
assert archive.count == 1 # no duplicate
|
||||
assert entries2[0].id == entries1[0].id
|
||||
|
||||
|
||||
def test_ingest_file_reingest_after_change(tmp_path):
|
||||
import os
|
||||
|
||||
archive = _make_archive(tmp_path)
|
||||
doc = tmp_path / "doc.md"
|
||||
doc.write_text("# Title\n\nOriginal content.")
|
||||
ingest_file(archive, doc)
|
||||
assert archive.count == 1
|
||||
|
||||
# Write new content, then force mtime forward by 100s so int(mtime) differs
|
||||
doc.write_text("# Title\n\nUpdated content.")
|
||||
new_mtime = doc.stat().st_mtime + 100
|
||||
os.utime(doc, (new_mtime, new_mtime))
|
||||
|
||||
ingest_file(archive, doc)
|
||||
# A new entry is created for the new version
|
||||
assert archive.count == 2
|
||||
|
||||
|
||||
def test_ingest_file_source_ref_contains_path(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
doc = tmp_path / "thing.txt"
|
||||
doc.write_text("Plain text.")
|
||||
entries = ingest_file(archive, doc)
|
||||
assert str(doc) in entries[0].source_ref
|
||||
|
||||
|
||||
def test_ingest_file_large_produces_chunks(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
doc = tmp_path / "big.md"
|
||||
# Build content with clear ## sections large enough to trigger chunking
|
||||
big_a = "# Doc\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
|
||||
big_b = "## Part Two\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
|
||||
doc.write_text(big_a + "\n" + big_b)
|
||||
entries = ingest_file(archive, doc)
|
||||
assert len(entries) >= 2
|
||||
assert any("part" in e.title.lower() for e in entries)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ingest_directory
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ingest_directory_basic(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
docs = tmp_path / "docs"
|
||||
docs.mkdir()
|
||||
(docs / "a.md").write_text("# Alpha\n\nFirst doc.")
|
||||
(docs / "b.txt").write_text("Beta plain text.")
|
||||
(docs / "skip.py").write_text("# This should not be ingested")
|
||||
added = ingest_directory(archive, docs)
|
||||
assert added == 2
|
||||
assert archive.count == 2
|
||||
|
||||
|
||||
def test_ingest_directory_custom_extensions(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
docs = tmp_path / "docs"
|
||||
docs.mkdir()
|
||||
(docs / "a.md").write_text("# Alpha")
|
||||
(docs / "b.py").write_text("No heading — uses stem.")
|
||||
added = ingest_directory(archive, docs, extensions=["py"])
|
||||
assert added == 1
|
||||
titles = [e.title for e in archive._entries.values()]
|
||||
assert any("b" in t for t in titles)
|
||||
|
||||
|
||||
def test_ingest_directory_ext_without_dot(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
docs = tmp_path / "docs"
|
||||
docs.mkdir()
|
||||
(docs / "notes.md").write_text("# Notes\n\nContent.")
|
||||
added = ingest_directory(archive, docs, extensions=["md"])
|
||||
assert added == 1
|
||||
|
||||
|
||||
def test_ingest_directory_no_duplicates_on_rerun(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
docs = tmp_path / "docs"
|
||||
docs.mkdir()
|
||||
(docs / "file.md").write_text("# Stable\n\nSame content.")
|
||||
ingest_directory(archive, docs)
|
||||
assert archive.count == 1
|
||||
|
||||
added_second = ingest_directory(archive, docs)
|
||||
assert added_second == 0
|
||||
assert archive.count == 1
|
||||
|
||||
|
||||
def test_ingest_directory_recurses_subdirs(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
docs = tmp_path / "docs"
|
||||
sub = docs / "sub"
|
||||
sub.mkdir(parents=True)
|
||||
(docs / "top.md").write_text("# Top level")
|
||||
(sub / "nested.md").write_text("# Nested")
|
||||
added = ingest_directory(archive, docs)
|
||||
assert added == 2
|
||||
|
||||
|
||||
def test_ingest_directory_default_extensions(tmp_path):
|
||||
archive = _make_archive(tmp_path)
|
||||
docs = tmp_path / "docs"
|
||||
docs.mkdir()
|
||||
(docs / "a.md").write_text("markdown")
|
||||
(docs / "b.txt").write_text("text")
|
||||
(docs / "c.json").write_text('{"key": "value"}')
|
||||
(docs / "d.yaml").write_text("key: value")
|
||||
added = ingest_directory(archive, docs)
|
||||
assert added == 3 # md, txt, json — not yaml
|
||||
Reference in New Issue
Block a user