feat(mnemosyne): add nexus/mnemosyne/ingest.py

Part of #1216 — Project Mnemosyne Phase 1 Foundation
This commit is contained in:
2026-04-11 09:52:10 +00:00
parent 01f3e7fba0
commit e16b513693

62
nexus/mnemosyne/ingest.py Normal file
View File

@@ -0,0 +1,62 @@
"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, and manual entries.
"""
from __future__ import annotations
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def ingest_from_mempalace(
archive: MnemosyneArchive,
mempalace_entries: list[dict],
) -> int:
"""Ingest entries from a MemPalace export.
Each dict should have at least: content, metadata (optional).
Returns count of new entries added.
"""
added = 0
for mp_entry in mempalace_entries:
content = mp_entry.get("content", "")
metadata = mp_entry.get("metadata", {})
source_ref = mp_entry.get("id", "")
# Skip if already ingested
if any(e.source_ref == source_ref for e in archive._entries.values()):
continue
entry = ArchiveEntry(
title=metadata.get("title", content[:80]),
content=content,
source="mempalace",
source_ref=source_ref,
topics=metadata.get("topics", []),
metadata=metadata,
)
archive.add(entry)
added += 1
return added
def ingest_event(
archive: MnemosyneArchive,
title: str,
content: str,
topics: Optional[list[str]] = None,
source: str = "event",
metadata: Optional[dict] = None,
) -> ArchiveEntry:
"""Ingest a single event into the archive."""
entry = ArchiveEntry(
title=title,
content=content,
source=source,
topics=topics or [],
metadata=metadata or {},
)
return archive.add(entry)