feat(atlas): lossless context + memory subsystem from hermes-lcm and gbrain
All checks were successful
Lint / lint (pull_request) Successful in 9s

Implements the ATLAS (Adaptive Turn-Lineage Archival System) lossless
memory subsystem as described in issue #985. All 183 fixture-backed
tests pass.

agent/atlas/ — modular ATLAS package:
- turns.py: RawTurnStore, immutable append-only turn records with stable
  lineage IDs (session_id:seq:06d format). Turns are never deleted.
- dag.py: SummaryDAGStore, compaction that creates summary nodes storing
  source_turn_ids — every compaction is traceable back to raw turns.
- stores.py: WorldKnowledgeStore, DurableMemoryStore, SessionStateStore
  with strict routing (no mixed bucket).
- extractor.py: TypedLinkExtractor, deterministic regex-based extraction
  of 7 typed relation types on every write (no LLM calls):
  DEFINES, MODIFIES, REFERENCES, DEPENDS_ON, CONTRADICTS, PREFERS, LOCATES
- recall.py: RecallEngine with 3 explicit recall ops: search/describe/expand
- db.py: AtlasDB SQLite connection/schema bootstrap

agent/atlas_memory.py — AtlasMemory facade (higher-level API)
agent/lossless_context.py — JSONL-based lossless context subsystem
plugins/memory/atlas/ — MemoryProvider plugin wiring into Hermes lifecycle
tools/lossless_recall_tool.py — lossless_recall tool (search/describe/expand)
tools/memory_tool.py — extended with world_knowledge store target

tests/ — 183 fixture-backed tests covering all acceptance criteria:
- Every turn persisted with stable lineage IDs 
- Compaction builds retrievable summary DAG nodes with source references 
- 3 explicit recall operations (search/describe/expand) 
- Writes route to explicit stores (no mixed bucket) 
- 7 typed relation types with fixture-backed tests 
- test_recover_fact_from_compacted_context proves fact recovery without
  re-injecting the full original transcript 

Fixes #985
This commit is contained in:
Alexander Whitestone
2026-04-22 10:15:21 -04:00
parent d574690abe
commit 75e31bee27
24 changed files with 5568 additions and 12 deletions

View File

@@ -0,0 +1,13 @@
"""ATLAS memory plugin package.
Registers the AtlasMemoryProvider with Hermes memory plugin discovery.
"""
from plugins.memory.atlas.provider import AtlasMemoryProvider
def register(manager):
"""Plugin entry point called by MemoryManager plugin loader."""
provider = AtlasMemoryProvider()
if provider.is_available():
manager.add_provider(provider)

View File

@@ -0,0 +1,9 @@
name: atlas
description: >
ATLAS lossless context + memory subsystem.
Persists every turn with a stable lineage ID, builds summary DAG nodes
during compaction, and routes writes to explicit stores (world knowledge,
durable memory, session state). Exposes atlas_search, atlas_describe,
and atlas_expand recall tools.
version: "1.0.0"
author: hermes-agent

View File

@@ -0,0 +1,316 @@
"""AtlasMemoryProvider — MemoryProvider integration for the ATLAS subsystem.
Wires the ATLAS core (RawTurnStore, SummaryDAGStore, AtlasStores,
TypedLinkExtractor, RecallEngine) into Hermes' MemoryProvider lifecycle.
Responsibilities
----------------
* On every turn: persist the raw turn to RawTurnStore (lossless).
* On pre-compress: build a SummaryDAGStore node from the messages being
discarded (DAG compaction — no destructive deletion).
* Provide tool schemas for atlas_search, atlas_describe, atlas_expand.
* Route explicit memory writes to the correct store via AtlasStores.
* Extract typed links on every turn write.
Configuration (config.yaml)
---------------------------
memory:
provider: atlas
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
logger = logging.getLogger(__name__)
class AtlasMemoryProvider(MemoryProvider):
"""Lossless context + memory subsystem for Hermes."""
# ------------------------------------------------------------------
# MemoryProvider identity
@property
def name(self) -> str:
return "atlas"
def is_available(self) -> bool:
"""Always available — pure-local SQLite, no external deps."""
return True
# ------------------------------------------------------------------
# Lifecycle
def initialize(self, session_id: str, **kwargs) -> None:
hermes_home = Path(kwargs.get("hermes_home", Path.home() / ".hermes"))
db_path = hermes_home / "atlas.db"
from agent.atlas.db import AtlasDB
from agent.atlas.turns import RawTurnStore
from agent.atlas.dag import SummaryDAGStore
from agent.atlas.stores import AtlasStores
from agent.atlas.extractor import TypedLinkExtractor
from agent.atlas.recall import RecallEngine
self._db = AtlasDB(db_path)
self._db.open()
self._turns = RawTurnStore(self._db)
self._dag = SummaryDAGStore(self._db)
self._stores = AtlasStores(self._db)
self._extractor = TypedLinkExtractor(self._db)
self._recall = RecallEngine(self._db)
self._session_id = session_id
logger.info("ATLAS provider initialised (session=%s, db=%s)", session_id, db_path)
def shutdown(self) -> None:
if hasattr(self, "_db"):
self._db.close()
# ------------------------------------------------------------------
# System prompt
def system_prompt_block(self) -> str:
if not hasattr(self, "_recall"):
return ""
return (
"## ATLAS Memory\n"
"Your conversation is persisted losslessly in the ATLAS memory subsystem.\n"
"Use these tools to recall compacted context:\n"
" atlas_search(query) — full-text search over turns, summaries, and memory\n"
" atlas_describe(id) — describe a specific turn or summary node\n"
" atlas_expand(node_id) — expand a summary node to its source turns\n"
)
# ------------------------------------------------------------------
# Turn persistence
def sync_turn(
self, user_content: str, assistant_content: str, *, session_id: str = ""
) -> None:
"""Persist user+assistant turns and extract typed links."""
if not hasattr(self, "_turns"):
return
sid = session_id or getattr(self, "_session_id", "default")
ts = time.time()
if user_content:
turn_id = self._turns.append(
sid, "user", user_content, timestamp=ts
)
self._extractor.extract_and_store(user_content, turn_id)
if assistant_content:
turn_id = self._turns.append(
sid, "assistant", assistant_content, timestamp=ts + 0.001
)
self._extractor.extract_and_store(assistant_content, turn_id)
# ------------------------------------------------------------------
# Compaction hook — produces summary DAG nodes
def on_pre_compress(self, messages: List[Dict[str, Any]]) -> str:
"""Build a DAG summary node from messages about to be compressed.
The summary text is derived from assistant messages in the to-be-
compressed window (no LLM call — uses first 500 chars of each
assistant turn as an extractive summary). More sophisticated
summarisation can be added later without changing the interface.
"""
if not hasattr(self, "_dag") or not messages:
return ""
sid = getattr(self, "_session_id", "default")
# Collect turn IDs for turns that are already persisted
source_turn_ids = []
summary_parts = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if not content or not isinstance(content, str):
continue
# Best-effort match to persisted turn_ids via content search
# (exact matching not required; lineage is tracked by session)
if role in ("user", "assistant", "tool"):
turns = self._turns.get_session_turns(sid)
for t in turns:
if t.content.strip()[:200] == content.strip()[:200]:
source_turn_ids.append(t.turn_id)
break
if role == "assistant":
summary_parts.append(content[:500])
if not summary_parts:
return ""
summary_text = "\n\n".join(summary_parts)[:2000]
node = self._dag.add_node(
sid,
summary_text=summary_text,
source_turn_ids=source_turn_ids or [],
)
# Also extract links from summary
self._extractor.extract_and_store(
summary_text, node.node_id, source_type="dag"
)
logger.info(
"ATLAS: compaction DAG node %s created (%d source turns)",
node.node_id,
node.source_count(),
)
# Return summary text so the compressor can embed it in its prompt
return f"[ATLAS compaction summary — node {node.node_id}]\n{summary_text}"
# ------------------------------------------------------------------
# Tool schemas
def get_tool_schemas(self) -> List[Dict[str, Any]]:
return [
{
"name": "atlas_search",
"description": (
"Search the ATLAS memory corpus (raw turns, DAG summary nodes, "
"durable memory, world knowledge) using full-text search. "
"Use this to recall facts from earlier in the conversation even "
"after context compaction."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Free-text search query.",
},
"doc_types": {
"type": "array",
"items": {"type": "string", "enum": ["raw_turn", "dag", "durable", "world"]},
"description": "Limit results to these doc types (omit for all).",
},
"limit": {
"type": "integer",
"description": "Max results to return (default 10).",
"default": 10,
},
},
"required": ["query"],
},
},
{
"name": "atlas_describe",
"description": (
"Describe a specific turn or DAG summary node by its lineage ID. "
"Turn IDs look like 'session_id:NNNN'. "
"DAG node IDs look like 'dag:session_id:NNNN'."
),
"parameters": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "The turn_id or DAG node_id to describe.",
},
},
"required": ["id"],
},
},
{
"name": "atlas_expand",
"description": (
"Expand a DAG summary node to reveal the source turns it compacted. "
"Lets you recover specific facts from compacted context without "
"re-injecting the full original transcript."
),
"parameters": {
"type": "object",
"properties": {
"node_id": {
"type": "string",
"description": "The DAG node_id to expand (e.g. 'dag:session_id:0000').",
},
"max_turns": {
"type": "integer",
"description": "Maximum source turns to include (default 20).",
"default": 20,
},
},
"required": ["node_id"],
},
},
{
"name": "atlas_store",
"description": (
"Write a fact to the appropriate ATLAS store. "
"Use 'world:' prefix for world knowledge, 'session:' for session state, "
"or no prefix for durable memory."
),
"parameters": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The fact to store.",
},
"category": {
"type": "string",
"description": "Category tag (e.g. 'user_pref', 'project', 'world:geography').",
},
},
"required": ["content"],
},
},
]
# ------------------------------------------------------------------
# Tool dispatch
def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str:
if not hasattr(self, "_recall"):
return json.dumps({"error": "ATLAS provider not initialised"})
try:
if tool_name == "atlas_search":
result = self._recall.search(
args["query"],
limit=args.get("limit", 10),
doc_types=args.get("doc_types"),
)
return json.dumps({"result": result})
elif tool_name == "atlas_describe":
result = self._recall.describe(args["id"])
return json.dumps({"result": result})
elif tool_name == "atlas_expand":
result = self._recall.expand(
args["node_id"],
max_turns=args.get("max_turns", 20),
)
return json.dumps({"result": result})
elif tool_name == "atlas_store":
sid = getattr(self, "_session_id", "default")
target, row_id = self._stores.write(
args["content"],
category=args.get("category", ""),
session_id=sid,
)
return json.dumps({"result": f"Stored to {target.value} store (id={row_id})"})
else:
return json.dumps({"error": f"Unknown tool: {tool_name}"})
except Exception as exc:
logger.error("ATLAS tool %s failed: %s", tool_name, exc)
return json.dumps({"error": str(exc)})
# ------------------------------------------------------------------
# Config schema (for `hermes memory setup`)
def get_config_schema(self) -> List[Dict[str, Any]]:
return [] # fully local; no credentials needed