Files
the-nexus/nexus/mnemosyne/ingest.py
Claude (Opus 4.6) 75f11b4f48
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
[claude] Mnemosyne file-based document ingestion pipeline (#1275) (#1276)
2026-04-12 11:50:16 +00:00

183 lines
5.5 KiB
Python

"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, manual entries, and files.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Optional, Union
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
_DEFAULT_EXTENSIONS = [".md", ".txt", ".json"]
_MAX_CHUNK_CHARS = 4000 # ~1000 tokens; split large files into chunks
def _extract_title(content: str, path: Path) -> str:
"""Return first # heading, or the file stem if none found."""
for line in content.splitlines():
stripped = line.strip()
if stripped.startswith("# "):
return stripped[2:].strip()
return path.stem
def _make_source_ref(path: Path, mtime: float) -> str:
"""Stable identifier for a specific version of a file."""
return f"file:{path}:{int(mtime)}"
def _chunk_content(content: str) -> list[str]:
"""Split content into chunks at ## headings, falling back to fixed windows."""
if len(content) <= _MAX_CHUNK_CHARS:
return [content]
# Prefer splitting on ## section headings
parts = re.split(r"\n(?=## )", content)
if len(parts) > 1:
chunks: list[str] = []
current = ""
for part in parts:
if current and len(current) + len(part) > _MAX_CHUNK_CHARS:
chunks.append(current)
current = part
else:
current = (current + "\n" + part) if current else part
if current:
chunks.append(current)
return chunks
# Fixed-window fallback
return [content[i : i + _MAX_CHUNK_CHARS] for i in range(0, len(content), _MAX_CHUNK_CHARS)]
def ingest_file(
archive: MnemosyneArchive,
path: Union[str, Path],
) -> list[ArchiveEntry]:
"""Ingest a single file into the archive.
- Title is taken from the first ``# heading`` or the filename stem.
- Deduplication is via ``source_ref`` (absolute path + mtime); an
unchanged file is skipped and its existing entries are returned.
- Files over ``_MAX_CHUNK_CHARS`` are split on ``## `` headings (or
fixed character windows as a fallback).
Returns a list of ArchiveEntry objects (one per chunk).
"""
path = Path(path).resolve()
mtime = path.stat().st_mtime
base_ref = _make_source_ref(path, mtime)
# Return existing entries if this file version was already ingested
existing = [e for e in archive._entries.values() if e.source_ref and e.source_ref.startswith(base_ref)]
if existing:
return existing
content = path.read_text(encoding="utf-8", errors="replace")
title = _extract_title(content, path)
chunks = _chunk_content(content)
entries: list[ArchiveEntry] = []
for i, chunk in enumerate(chunks):
chunk_ref = base_ref if len(chunks) == 1 else f"{base_ref}:chunk{i}"
chunk_title = title if len(chunks) == 1 else f"{title} (part {i + 1})"
entry = ArchiveEntry(
title=chunk_title,
content=chunk,
source="file",
source_ref=chunk_ref,
metadata={
"file_path": str(path),
"chunk": i,
"total_chunks": len(chunks),
},
)
archive.add(entry)
entries.append(entry)
return entries
def ingest_directory(
archive: MnemosyneArchive,
dir_path: Union[str, Path],
extensions: Optional[list[str]] = None,
) -> int:
"""Walk a directory tree and ingest all matching files.
``extensions`` defaults to ``[".md", ".txt", ".json"]``.
Values may be given with or without a leading dot.
Returns the count of new archive entries created.
"""
dir_path = Path(dir_path).resolve()
if extensions is None:
exts = _DEFAULT_EXTENSIONS
else:
exts = [e if e.startswith(".") else f".{e}" for e in extensions]
added = 0
for file_path in sorted(dir_path.rglob("*")):
if not file_path.is_file():
continue
if file_path.suffix.lower() not in exts:
continue
before = archive.count
ingest_file(archive, file_path)
added += archive.count - before
return added
def ingest_from_mempalace(
archive: MnemosyneArchive,
mempalace_entries: list[dict],
) -> int:
"""Ingest entries from a MemPalace export.
Each dict should have at least: content, metadata (optional).
Returns count of new entries added.
"""
added = 0
for mp_entry in mempalace_entries:
content = mp_entry.get("content", "")
metadata = mp_entry.get("metadata", {})
source_ref = mp_entry.get("id", "")
# Skip if already ingested
if any(e.source_ref == source_ref for e in archive._entries.values()):
continue
entry = ArchiveEntry(
title=metadata.get("title", content[:80]),
content=content,
source="mempalace",
source_ref=source_ref,
topics=metadata.get("topics", []),
metadata=metadata,
)
archive.add(entry)
added += 1
return added
def ingest_event(
archive: MnemosyneArchive,
title: str,
content: str,
topics: Optional[list[str]] = None,
source: str = "event",
metadata: Optional[dict] = None,
) -> ArchiveEntry:
"""Ingest a single event into the archive."""
entry = ArchiveEntry(
title=title,
content=content,
source=source,
topics=topics or [],
metadata=metadata or {},
)
return archive.add(entry)