242 lines
7.8 KiB
Python
242 lines
7.8 KiB
Python
"""Tests for file-based ingestion pipeline (ingest_file / ingest_directory)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from nexus.mnemosyne.archive import MnemosyneArchive
|
|
from nexus.mnemosyne.ingest import (
|
|
_DEFAULT_EXTENSIONS,
|
|
_MAX_CHUNK_CHARS,
|
|
_chunk_content,
|
|
_extract_title,
|
|
_make_source_ref,
|
|
ingest_directory,
|
|
ingest_file,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_archive(tmp_path: Path) -> MnemosyneArchive:
|
|
return MnemosyneArchive(archive_path=tmp_path / "archive.json")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unit: _extract_title
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_extract_title_from_heading():
|
|
content = "# My Document\n\nSome content here."
|
|
assert _extract_title(content, Path("ignored.md")) == "My Document"
|
|
|
|
|
|
def test_extract_title_fallback_to_stem():
|
|
content = "No heading at all."
|
|
assert _extract_title(content, Path("/docs/my_notes.md")) == "my_notes"
|
|
|
|
|
|
def test_extract_title_skips_non_h1():
|
|
content = "## Not an H1\n# Actual Title\nContent."
|
|
assert _extract_title(content, Path("x.md")) == "Actual Title"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unit: _make_source_ref
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_source_ref_format():
|
|
p = Path("/tmp/foo.md")
|
|
ref = _make_source_ref(p, 1234567890.9)
|
|
assert ref == "file:/tmp/foo.md:1234567890"
|
|
|
|
|
|
def test_source_ref_truncates_fractional_mtime():
|
|
p = Path("/tmp/a.txt")
|
|
assert _make_source_ref(p, 100.99) == _make_source_ref(p, 100.01)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unit: _chunk_content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_chunk_short_content_is_single():
|
|
content = "Short content."
|
|
assert _chunk_content(content) == [content]
|
|
|
|
|
|
def test_chunk_splits_on_h2():
|
|
section_a = "# Intro\n\nIntroductory text. " + "x" * 100
|
|
section_b = "## Section B\n\nBody of section B. " + "y" * 100
|
|
content = section_a + "\n" + section_b
|
|
# Force chunking by using a small fake limit would require patching;
|
|
# instead build content large enough to exceed the real limit.
|
|
big_a = "# Intro\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
|
|
big_b = "## Section B\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
|
|
combined = big_a + "\n" + big_b
|
|
chunks = _chunk_content(combined)
|
|
assert len(chunks) >= 2
|
|
assert any("Section B" in c for c in chunks)
|
|
|
|
|
|
def test_chunk_fixed_window_fallback():
|
|
# Content with no ## headings but > MAX_CHUNK_CHARS
|
|
content = "word " * (_MAX_CHUNK_CHARS // 5 + 100)
|
|
chunks = _chunk_content(content)
|
|
assert len(chunks) >= 2
|
|
for c in chunks:
|
|
assert len(c) <= _MAX_CHUNK_CHARS
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ingest_file
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_ingest_file_returns_entry(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
doc = tmp_path / "notes.md"
|
|
doc.write_text("# My Notes\n\nHello world.")
|
|
entries = ingest_file(archive, doc)
|
|
assert len(entries) == 1
|
|
assert entries[0].title == "My Notes"
|
|
assert entries[0].source == "file"
|
|
assert "Hello world" in entries[0].content
|
|
|
|
|
|
def test_ingest_file_uses_stem_when_no_heading(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
doc = tmp_path / "raw_log.txt"
|
|
doc.write_text("Just some plain text without a heading.")
|
|
entries = ingest_file(archive, doc)
|
|
assert entries[0].title == "raw_log"
|
|
|
|
|
|
def test_ingest_file_dedup_unchanged(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
doc = tmp_path / "doc.md"
|
|
doc.write_text("# Title\n\nContent.")
|
|
entries1 = ingest_file(archive, doc)
|
|
assert archive.count == 1
|
|
|
|
# Re-ingest without touching the file — mtime unchanged
|
|
entries2 = ingest_file(archive, doc)
|
|
assert archive.count == 1 # no duplicate
|
|
assert entries2[0].id == entries1[0].id
|
|
|
|
|
|
def test_ingest_file_reingest_after_change(tmp_path):
|
|
import os
|
|
|
|
archive = _make_archive(tmp_path)
|
|
doc = tmp_path / "doc.md"
|
|
doc.write_text("# Title\n\nOriginal content.")
|
|
ingest_file(archive, doc)
|
|
assert archive.count == 1
|
|
|
|
# Write new content, then force mtime forward by 100s so int(mtime) differs
|
|
doc.write_text("# Title\n\nUpdated content.")
|
|
new_mtime = doc.stat().st_mtime + 100
|
|
os.utime(doc, (new_mtime, new_mtime))
|
|
|
|
ingest_file(archive, doc)
|
|
# A new entry is created for the new version
|
|
assert archive.count == 2
|
|
|
|
|
|
def test_ingest_file_source_ref_contains_path(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
doc = tmp_path / "thing.txt"
|
|
doc.write_text("Plain text.")
|
|
entries = ingest_file(archive, doc)
|
|
assert str(doc) in entries[0].source_ref
|
|
|
|
|
|
def test_ingest_file_large_produces_chunks(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
doc = tmp_path / "big.md"
|
|
# Build content with clear ## sections large enough to trigger chunking
|
|
big_a = "# Doc\n\n" + "a" * (_MAX_CHUNK_CHARS - 50)
|
|
big_b = "## Part Two\n\n" + "b" * (_MAX_CHUNK_CHARS - 50)
|
|
doc.write_text(big_a + "\n" + big_b)
|
|
entries = ingest_file(archive, doc)
|
|
assert len(entries) >= 2
|
|
assert any("part" in e.title.lower() for e in entries)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ingest_directory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_ingest_directory_basic(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
docs = tmp_path / "docs"
|
|
docs.mkdir()
|
|
(docs / "a.md").write_text("# Alpha\n\nFirst doc.")
|
|
(docs / "b.txt").write_text("Beta plain text.")
|
|
(docs / "skip.py").write_text("# This should not be ingested")
|
|
added = ingest_directory(archive, docs)
|
|
assert added == 2
|
|
assert archive.count == 2
|
|
|
|
|
|
def test_ingest_directory_custom_extensions(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
docs = tmp_path / "docs"
|
|
docs.mkdir()
|
|
(docs / "a.md").write_text("# Alpha")
|
|
(docs / "b.py").write_text("No heading — uses stem.")
|
|
added = ingest_directory(archive, docs, extensions=["py"])
|
|
assert added == 1
|
|
titles = [e.title for e in archive._entries.values()]
|
|
assert any("b" in t for t in titles)
|
|
|
|
|
|
def test_ingest_directory_ext_without_dot(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
docs = tmp_path / "docs"
|
|
docs.mkdir()
|
|
(docs / "notes.md").write_text("# Notes\n\nContent.")
|
|
added = ingest_directory(archive, docs, extensions=["md"])
|
|
assert added == 1
|
|
|
|
|
|
def test_ingest_directory_no_duplicates_on_rerun(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
docs = tmp_path / "docs"
|
|
docs.mkdir()
|
|
(docs / "file.md").write_text("# Stable\n\nSame content.")
|
|
ingest_directory(archive, docs)
|
|
assert archive.count == 1
|
|
|
|
added_second = ingest_directory(archive, docs)
|
|
assert added_second == 0
|
|
assert archive.count == 1
|
|
|
|
|
|
def test_ingest_directory_recurses_subdirs(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
docs = tmp_path / "docs"
|
|
sub = docs / "sub"
|
|
sub.mkdir(parents=True)
|
|
(docs / "top.md").write_text("# Top level")
|
|
(sub / "nested.md").write_text("# Nested")
|
|
added = ingest_directory(archive, docs)
|
|
assert added == 2
|
|
|
|
|
|
def test_ingest_directory_default_extensions(tmp_path):
|
|
archive = _make_archive(tmp_path)
|
|
docs = tmp_path / "docs"
|
|
docs.mkdir()
|
|
(docs / "a.md").write_text("markdown")
|
|
(docs / "b.txt").write_text("text")
|
|
(docs / "c.json").write_text('{"key": "value"}')
|
|
(docs / "d.yaml").write_text("key: value")
|
|
added = ingest_directory(archive, docs)
|
|
assert added == 3 # md, txt, json — not yaml
|