Files
the-nexus/nexus/mnemosyne/ingest.py
Alexander Whitestone b9f1602067
Some checks failed
Deploy Nexus / deploy (push) Failing after 3s
Staging Verification Gate / verify-staging (push) Failing after 3s
merge: Mnemosyne Phase 1 — Living Holographic Archive
Co-authored-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
Co-committed-by: Alexander Whitestone <alexander@alexanderwhitestone.com>
2026-04-11 12:10:14 +00:00

63 lines
1.6 KiB
Python

"""Ingestion pipeline — feeds data into the archive.
Supports ingesting from MemPalace, raw events, and manual entries.
"""
from __future__ import annotations
from typing import Optional
from nexus.mnemosyne.archive import MnemosyneArchive
from nexus.mnemosyne.entry import ArchiveEntry
def ingest_from_mempalace(
archive: MnemosyneArchive,
mempalace_entries: list[dict],
) -> int:
"""Ingest entries from a MemPalace export.
Each dict should have at least: content, metadata (optional).
Returns count of new entries added.
"""
added = 0
for mp_entry in mempalace_entries:
content = mp_entry.get("content", "")
metadata = mp_entry.get("metadata", {})
source_ref = mp_entry.get("id", "")
# Skip if already ingested
if any(e.source_ref == source_ref for e in archive._entries.values()):
continue
entry = ArchiveEntry(
title=metadata.get("title", content[:80]),
content=content,
source="mempalace",
source_ref=source_ref,
topics=metadata.get("topics", []),
metadata=metadata,
)
archive.add(entry)
added += 1
return added
def ingest_event(
archive: MnemosyneArchive,
title: str,
content: str,
topics: Optional[list[str]] = None,
source: str = "event",
metadata: Optional[dict] = None,
) -> ArchiveEntry:
"""Ingest a single event into the archive."""
entry = ArchiveEntry(
title=title,
content=content,
source=source,
topics=topics or [],
metadata=metadata or {},
)
return archive.add(entry)