Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
4488847c13 feat: Add Reasoning Trace HUD Component
Some checks failed
CI / test (pull_request) Failing after 57s
CI / validate (pull_request) Failing after 34s
Review Approval Gate / verify-review (pull_request) Failing after 6s
Closes #875

- Added new ReasoningTrace component for real-time reasoning visualization
- Shows agent's reasoning steps during complex task execution
- Supports step types: THINK, DECIDE, RECALL, PLAN, EXECUTE, VERIFY, DOUBT, MEMORY
- Includes confidence visualization, task tracking, and export functionality
- Integrated into existing GOFAI HUD system
2026-04-13 18:23:05 -04:00
9 changed files with 720 additions and 1240 deletions

View File

@@ -1,21 +0,0 @@
"""
agent — Cross-session agent memory and lifecycle hooks.
Provides persistent memory for agents via MemPalace integration.
Agents recall context at session start and write diary entries at session end.
Modules:
memory.py — AgentMemory class (recall, remember, diary)
memory_hooks.py — Session lifecycle hooks (drop-in integration)
"""
from agent.memory import AgentMemory, MemoryContext, SessionTranscript, create_agent_memory
from agent.memory_hooks import MemoryHooks
__all__ = [
"AgentMemory",
"MemoryContext",
"MemoryHooks",
"SessionTranscript",
"create_agent_memory",
]

View File

@@ -1,396 +0,0 @@
"""
agent.memory — Cross-session agent memory via MemPalace.
Gives agents persistent memory across sessions. On wake-up, agents
recall relevant context from past sessions. On session end, they
write a diary entry summarizing what happened.
Architecture:
Session Start → memory.recall_context() → inject L0/L1 into prompt
During Session → memory.remember() → store important facts
Session End → memory.write_diary() → summarize session
All operations degrade gracefully — if MemPalace is unavailable,
the agent continues without memory and logs a warning.
Usage:
from agent.memory import AgentMemory
mem = AgentMemory(agent_name="bezalel", wing="wing_bezalel")
# Session start — load context
context = mem.recall_context("What was I working on last time?")
# During session — store important decisions
mem.remember("Switched CI runner from GitHub Actions to self-hosted", room="forge")
# Session end — write diary
mem.write_diary("Fixed PR #1386, reconciled fleet registry locations")
"""
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger("agent.memory")
@dataclass
class MemoryContext:
"""Context loaded at session start from MemPalace."""
relevant_memories: list[dict] = field(default_factory=list)
recent_diaries: list[dict] = field(default_factory=list)
facts: list[dict] = field(default_factory=list)
loaded: bool = False
error: Optional[str] = None
def to_prompt_block(self) -> str:
"""Format context as a text block to inject into the agent prompt."""
if not self.loaded:
return ""
parts = []
if self.recent_diaries:
parts.append("=== Recent Session Summaries ===")
for d in self.recent_diaries[:3]:
ts = d.get("timestamp", "")
text = d.get("text", "")
parts.append(f"[{ts}] {text[:500]}")
if self.facts:
parts.append("\n=== Known Facts ===")
for f in self.facts[:10]:
text = f.get("text", "")
parts.append(f"- {text[:200]}")
if self.relevant_memories:
parts.append("\n=== Relevant Past Memories ===")
for m in self.relevant_memories[:5]:
text = m.get("text", "")
score = m.get("score", 0)
parts.append(f"[{score:.2f}] {text[:300]}")
if not parts:
return ""
return "\n".join(parts)
@dataclass
class SessionTranscript:
"""A running log of the current session for diary writing."""
agent_name: str
wing: str
started_at: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
entries: list[dict] = field(default_factory=list)
def add_user_turn(self, text: str):
self.entries.append({
"role": "user",
"text": text[:2000],
"ts": time.time(),
})
def add_agent_turn(self, text: str):
self.entries.append({
"role": "agent",
"text": text[:2000],
"ts": time.time(),
})
def add_tool_call(self, tool: str, args: str, result_summary: str):
self.entries.append({
"role": "tool",
"tool": tool,
"args": args[:500],
"result": result_summary[:500],
"ts": time.time(),
})
def summary(self) -> str:
"""Generate a compact transcript summary."""
if not self.entries:
return "Empty session."
turns = []
for e in self.entries[-20:]: # last 20 entries
role = e["role"]
if role == "user":
turns.append(f"USER: {e['text'][:200]}")
elif role == "agent":
turns.append(f"AGENT: {e['text'][:200]}")
elif role == "tool":
turns.append(f"TOOL({e.get('tool','')}): {e.get('result','')[:150]}")
return "\n".join(turns)
class AgentMemory:
"""
Cross-session memory for an agent.
Wraps MemPalace with agent-specific conventions:
- Each agent has a wing (e.g., "wing_bezalel")
- Session summaries go in the "hermes" room
- Important decisions go in room-specific closets
- Facts go in the "nexus" room
"""
def __init__(
self,
agent_name: str,
wing: Optional[str] = None,
palace_path: Optional[Path] = None,
):
self.agent_name = agent_name
self.wing = wing or f"wing_{agent_name}"
self.palace_path = palace_path
self._transcript: Optional[SessionTranscript] = None
self._available: Optional[bool] = None
def _check_available(self) -> bool:
"""Check if MemPalace is accessible."""
if self._available is not None:
return self._available
try:
from nexus.mempalace.searcher import search_memories, add_memory, _get_client
from nexus.mempalace.config import MEMPALACE_PATH
path = self.palace_path or MEMPALACE_PATH
_get_client(path)
self._available = True
logger.info(f"MemPalace available at {path}")
except Exception as e:
self._available = False
logger.warning(f"MemPalace unavailable: {e}")
return self._available
def recall_context(
self,
query: Optional[str] = None,
n_results: int = 5,
) -> MemoryContext:
"""
Load relevant context from past sessions.
Called at session start to inject L0/L1 memory into the prompt.
Args:
query: What to search for. If None, loads recent diary entries.
n_results: Max memories to recall.
"""
ctx = MemoryContext()
if not self._check_available():
ctx.error = "MemPalace unavailable"
return ctx
try:
from nexus.mempalace.searcher import search_memories
# Load recent diary entries (session summaries)
ctx.recent_diaries = [
{"text": r.text, "score": r.score, "timestamp": r.metadata.get("timestamp", "")}
for r in search_memories(
"session summary",
palace_path=self.palace_path,
wing=self.wing,
room="hermes",
n_results=3,
)
]
# Load known facts
ctx.facts = [
{"text": r.text, "score": r.score}
for r in search_memories(
"important facts decisions",
palace_path=self.palace_path,
wing=self.wing,
room="nexus",
n_results=5,
)
]
# Search for relevant memories if query provided
if query:
ctx.relevant_memories = [
{"text": r.text, "score": r.score, "room": r.room}
for r in search_memories(
query,
palace_path=self.palace_path,
wing=self.wing,
n_results=n_results,
)
]
ctx.loaded = True
except Exception as e:
ctx.error = str(e)
logger.warning(f"Failed to recall context: {e}")
return ctx
def remember(
self,
text: str,
room: str = "nexus",
source_file: str = "",
metadata: Optional[dict] = None,
) -> Optional[str]:
"""
Store a memory.
Args:
text: The memory content.
room: Target room (forge, hermes, nexus, issues, experiments).
source_file: Optional source attribution.
metadata: Extra metadata.
Returns:
Document ID if stored, None if MemPalace unavailable.
"""
if not self._check_available():
logger.warning("Cannot store memory — MemPalace unavailable")
return None
try:
from nexus.mempalace.searcher import add_memory
doc_id = add_memory(
text=text,
room=room,
wing=self.wing,
palace_path=self.palace_path,
source_file=source_file,
extra_metadata=metadata or {},
)
logger.debug(f"Stored memory in {room}: {text[:80]}...")
return doc_id
except Exception as e:
logger.warning(f"Failed to store memory: {e}")
return None
def write_diary(
self,
summary: Optional[str] = None,
) -> Optional[str]:
"""
Write a session diary entry to MemPalace.
Called at session end. If summary is None, auto-generates one
from the session transcript.
Args:
summary: Override summary text. If None, generates from transcript.
Returns:
Document ID if stored, None if unavailable.
"""
if summary is None and self._transcript:
summary = self._transcript.summary()
if not summary:
return None
timestamp = datetime.now(timezone.utc).isoformat()
diary_text = f"[{timestamp}] Session by {self.agent_name}:\n{summary}"
return self.remember(
diary_text,
room="hermes",
metadata={
"type": "session_diary",
"agent": self.agent_name,
"timestamp": timestamp,
"entry_count": len(self._transcript.entries) if self._transcript else 0,
},
)
def start_session(self) -> SessionTranscript:
"""
Begin a new session transcript.
Returns the transcript object for recording turns.
"""
self._transcript = SessionTranscript(
agent_name=self.agent_name,
wing=self.wing,
)
logger.info(f"Session started for {self.agent_name}")
return self._transcript
def end_session(self, diary_summary: Optional[str] = None) -> Optional[str]:
"""
End the current session, write diary, return diary doc ID.
"""
doc_id = self.write_diary(diary_summary)
self._transcript = None
logger.info(f"Session ended for {self.agent_name}")
return doc_id
def search(
self,
query: str,
room: Optional[str] = None,
n_results: int = 5,
) -> list[dict]:
"""
Search memories. Useful during a session for recall.
Returns list of {text, room, wing, score} dicts.
"""
if not self._check_available():
return []
try:
from nexus.mempalace.searcher import search_memories
results = search_memories(
query,
palace_path=self.palace_path,
wing=self.wing,
room=room,
n_results=n_results,
)
return [
{"text": r.text, "room": r.room, "wing": r.wing, "score": r.score}
for r in results
]
except Exception as e:
logger.warning(f"Search failed: {e}")
return []
# --- Fleet-wide memory helpers ---
def create_agent_memory(
agent_name: str,
palace_path: Optional[Path] = None,
) -> AgentMemory:
"""
Factory for creating AgentMemory with standard config.
Reads wing from MEMPALACE_WING env or defaults to wing_{agent_name}.
"""
wing = os.environ.get("MEMPALACE_WING", f"wing_{agent_name}")
return AgentMemory(
agent_name=agent_name,
wing=wing,
palace_path=palace_path,
)

View File

@@ -1,183 +0,0 @@
"""
agent.memory_hooks — Session lifecycle hooks for agent memory.
Integrates AgentMemory into the agent session lifecycle:
- on_session_start: Load context, inject into prompt
- on_user_turn: Record user input
- on_agent_turn: Record agent output
- on_tool_call: Record tool usage
- on_session_end: Write diary, clean up
These hooks are designed to be called from the Hermes harness or
any agent framework. They're fire-and-forget — failures are logged
but never crash the session.
Usage:
from agent.memory_hooks import MemoryHooks
hooks = MemoryHooks(agent_name="bezalel")
hooks.on_session_start() # loads context
# In your agent loop:
hooks.on_user_turn("Check CI pipeline health")
hooks.on_agent_turn("Running CI check...")
hooks.on_tool_call("shell", "pytest tests/", "12 passed")
# End of session:
hooks.on_session_end() # writes diary
"""
from __future__ import annotations
import logging
from typing import Optional
from agent.memory import AgentMemory, MemoryContext, create_agent_memory
logger = logging.getLogger("agent.memory_hooks")
class MemoryHooks:
"""
Drop-in session lifecycle hooks for agent memory.
Wraps AgentMemory with error boundaries — every hook catches
exceptions and logs warnings so memory failures never crash
the agent session.
"""
def __init__(
self,
agent_name: str,
palace_path=None,
auto_diary: bool = True,
):
self.agent_name = agent_name
self.auto_diary = auto_diary
self._memory: Optional[AgentMemory] = None
self._context: Optional[MemoryContext] = None
self._active = False
@property
def memory(self) -> AgentMemory:
if self._memory is None:
self._memory = create_agent_memory(
self.agent_name,
palace_path=getattr(self, '_palace_path', None),
)
return self._memory
def on_session_start(self, query: Optional[str] = None) -> str:
"""
Called at session start. Loads context from MemPalace.
Returns a prompt block to inject into the agent's context, or
empty string if memory is unavailable.
Args:
query: Optional recall query (e.g., "What was I working on?")
"""
try:
self.memory.start_session()
self._active = True
self._context = self.memory.recall_context(query=query)
block = self._context.to_prompt_block()
if block:
logger.info(
f"Loaded {len(self._context.recent_diaries)} diaries, "
f"{len(self._context.facts)} facts, "
f"{len(self._context.relevant_memories)} relevant memories "
f"for {self.agent_name}"
)
else:
logger.info(f"No prior memory for {self.agent_name}")
return block
except Exception as e:
logger.warning(f"Session start memory hook failed: {e}")
return ""
def on_user_turn(self, text: str):
"""Record a user message."""
if not self._active:
return
try:
if self.memory._transcript:
self.memory._transcript.add_user_turn(text)
except Exception as e:
logger.debug(f"Failed to record user turn: {e}")
def on_agent_turn(self, text: str):
"""Record an agent response."""
if not self._active:
return
try:
if self.memory._transcript:
self.memory._transcript.add_agent_turn(text)
except Exception as e:
logger.debug(f"Failed to record agent turn: {e}")
def on_tool_call(self, tool: str, args: str, result_summary: str):
"""Record a tool invocation."""
if not self._active:
return
try:
if self.memory._transcript:
self.memory._transcript.add_tool_call(tool, args, result_summary)
except Exception as e:
logger.debug(f"Failed to record tool call: {e}")
def on_important_decision(self, text: str, room: str = "nexus"):
"""
Record an important decision or fact for long-term memory.
Use this when the agent makes a significant decision that
should persist beyond the current session.
"""
try:
self.memory.remember(text, room=room, metadata={"type": "decision"})
logger.info(f"Remembered decision: {text[:80]}...")
except Exception as e:
logger.warning(f"Failed to remember decision: {e}")
def on_session_end(self, summary: Optional[str] = None) -> Optional[str]:
"""
Called at session end. Writes diary entry.
Args:
summary: Override diary text. If None, auto-generates.
Returns:
Diary document ID, or None.
"""
if not self._active:
return None
try:
doc_id = self.memory.end_session(diary_summary=summary)
self._active = False
self._context = None
return doc_id
except Exception as e:
logger.warning(f"Session end memory hook failed: {e}")
self._active = False
return None
def search(self, query: str, room: Optional[str] = None) -> list[dict]:
"""
Search memories during a session.
Returns list of {text, room, wing, score}.
"""
try:
return self.memory.search(query, room=room)
except Exception as e:
logger.warning(f"Memory search failed: {e}")
return []
@property
def is_active(self) -> bool:
return self._active

2
app.js
View File

@@ -9,6 +9,7 @@ import { MemoryBirth } from './nexus/components/memory-birth.js';
import { MemoryOptimizer } from './nexus/components/memory-optimizer.js';
import { MemoryInspect } from './nexus/components/memory-inspect.js';
import { MemoryPulse } from './nexus/components/memory-pulse.js';
import { ReasoningTrace } from './nexus/components/reasoning-trace.js';
// ═══════════════════════════════════════════
// NEXUS v1.1 — Portal System Update
@@ -758,6 +759,7 @@ async function init() {
SpatialAudio.bindSpatialMemory(SpatialMemory);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(SpatialMemory);
ReasoningTrace.init();
updateLoad(90);
loadSession();

View File

@@ -1,258 +0,0 @@
#!/usr/bin/env python3
"""
memory_mine.py — Mine session transcripts into MemPalace.
Reads Hermes session logs (JSONL format) and stores summaries
in the palace. Supports batch mining, single-file processing,
and live directory watching.
Usage:
# Mine a single session file
python3 bin/memory_mine.py ~/.hermes/sessions/2026-04-13.jsonl
# Mine all sessions from last 7 days
python3 bin/memory_mine.py --days 7
# Mine a specific wing's sessions
python3 bin/memory_mine.py --wing wing_bezalel --days 14
# Dry run — show what would be mined
python3 bin/memory_mine.py --dry-run --days 7
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("memory-mine")
REPO_ROOT = Path(__file__).resolve().parent.parent
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
def parse_session_file(path: Path) -> list[dict]:
"""
Parse a JSONL session file into turns.
Each line is expected to be a JSON object with:
- role: "user" | "assistant" | "system" | "tool"
- content: text
- timestamp: ISO string (optional)
"""
turns = []
with open(path) as f:
for i, line in enumerate(f):
line = line.strip()
if not line:
continue
try:
turn = json.loads(line)
turns.append(turn)
except json.JSONDecodeError:
logger.debug(f"Skipping malformed line {i+1} in {path}")
return turns
def summarize_session(turns: list[dict], agent_name: str = "unknown") -> str:
"""
Generate a compact summary of a session's turns.
Keeps user messages and key agent responses, strips noise.
"""
if not turns:
return "Empty session."
user_msgs = []
agent_msgs = []
tool_calls = []
for turn in turns:
role = turn.get("role", "")
content = str(turn.get("content", ""))[:300]
if role == "user":
user_msgs.append(content)
elif role == "assistant":
agent_msgs.append(content)
elif role == "tool":
tool_name = turn.get("name", turn.get("tool", "unknown"))
tool_calls.append(f"{tool_name}: {content[:150]}")
parts = [f"Session by {agent_name}:"]
if user_msgs:
parts.append(f"\nUser asked ({len(user_msgs)} messages):")
for msg in user_msgs[:5]:
parts.append(f" - {msg[:200]}")
if len(user_msgs) > 5:
parts.append(f" ... and {len(user_msgs) - 5} more")
if agent_msgs:
parts.append(f"\nAgent responded ({len(agent_msgs)} messages):")
for msg in agent_msgs[:3]:
parts.append(f" - {msg[:200]}")
if tool_calls:
parts.append(f"\nTools used ({len(tool_calls)} calls):")
for tc in tool_calls[:5]:
parts.append(f" - {tc}")
return "\n".join(parts)
def mine_session(
path: Path,
wing: str,
palace_path: Optional[Path] = None,
dry_run: bool = False,
) -> Optional[str]:
"""
Mine a single session file into MemPalace.
Returns the document ID if stored, None on failure or dry run.
"""
try:
from agent.memory import AgentMemory
except ImportError:
logger.error("Cannot import agent.memory — is the repo in PYTHONPATH?")
return None
turns = parse_session_file(path)
if not turns:
logger.debug(f"Empty session file: {path}")
return None
agent_name = wing.replace("wing_", "")
summary = summarize_session(turns, agent_name)
if dry_run:
print(f"\n--- {path.name} ---")
print(summary[:500])
print(f"({len(turns)} turns)")
return None
mem = AgentMemory(agent_name=agent_name, wing=wing, palace_path=palace_path)
doc_id = mem.remember(
summary,
room="hermes",
source_file=str(path),
metadata={
"type": "mined_session",
"source": str(path),
"turn_count": len(turns),
"agent": agent_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
},
)
if doc_id:
logger.info(f"Mined {path.name}{doc_id} ({len(turns)} turns)")
else:
logger.warning(f"Failed to mine {path.name}")
return doc_id
def find_session_files(
sessions_dir: Path,
days: int = 7,
pattern: str = "*.jsonl",
) -> list[Path]:
"""
Find session files from the last N days.
"""
cutoff = datetime.now() - timedelta(days=days)
files = []
if not sessions_dir.exists():
logger.warning(f"Sessions directory not found: {sessions_dir}")
return files
for path in sorted(sessions_dir.glob(pattern)):
# Use file modification time as proxy for session date
mtime = datetime.fromtimestamp(path.stat().st_mtime)
if mtime >= cutoff:
files.append(path)
return files
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Mine session transcripts into MemPalace"
)
parser.add_argument(
"files", nargs="*", help="Session files to mine (JSONL format)"
)
parser.add_argument(
"--days", type=int, default=7,
help="Mine sessions from last N days (default: 7)"
)
parser.add_argument(
"--sessions-dir",
default=str(Path.home() / ".hermes" / "sessions"),
help="Directory containing session JSONL files"
)
parser.add_argument(
"--wing", default=None,
help="Wing name (default: auto-detect from MEMPALACE_WING env or 'wing_timmy')"
)
parser.add_argument(
"--palace-path", default=None,
help="Override palace path"
)
parser.add_argument(
"--dry-run", action="store_true",
help="Show what would be mined without storing"
)
args = parser.parse_args(argv)
wing = args.wing or os.environ.get("MEMPALACE_WING", "wing_timmy")
palace_path = Path(args.palace_path) if args.palace_path else None
if args.files:
files = [Path(f) for f in args.files]
else:
sessions_dir = Path(args.sessions_dir)
files = find_session_files(sessions_dir, days=args.days)
if not files:
logger.info("No session files found to mine.")
return 0
logger.info(f"Mining {len(files)} session files (wing={wing})")
mined = 0
failed = 0
for path in files:
result = mine_session(path, wing=wing, palace_path=palace_path, dry_run=args.dry_run)
if result:
mined += 1
elif result is None and not args.dry_run:
failed += 1
if args.dry_run:
logger.info(f"Dry run complete — {len(files)} files would be mined")
else:
logger.info(f"Mining complete — {mined} mined, {failed} failed")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -101,6 +101,19 @@
<div class="panel-header">ADAPTIVE CALIBRATOR</div>
<div id="calibrator-log-content" class="panel-content"></div>
</div>
<div class="hud-panel" id="reasoning-trace">
<div class="trace-header-container">
<div class="panel-header"><span class="trace-icon">🧠</span> REASONING TRACE</div>
<div class="trace-controls">
<button class="trace-btn" id="trace-clear" title="Clear trace">🗑️</button>
<button class="trace-btn" id="trace-toggle" title="Toggle visibility">👁️</button>
<button class="trace-btn" id="trace-export" title="Export trace">📤</button>
</div>
</div>
<div class="trace-task" id="trace-task">No active task</div>
<div class="trace-counter" id="trace-counter">0 steps</div>
<div id="reasoning-trace-content" class="panel-content trace-content"></div>
</div>
</div>
<!-- Evennia Room Snapshot Panel -->
@@ -152,18 +165,18 @@
<!-- Top Right: Agent Log, Atlas & SOUL Toggle -->
<div class="hud-top-right">
<button id="atlas-toggle-btn" class="hud-icon-btn" title="Portal Atlas">
<span class="hud-icon">🌐</span>
<span class="hud-btn-label">WORLDS</span>
</button>
<button id="atlas-toggle-btn" class="hud-icon-btn" title="World Directory">
<button id="soul-toggle-btn" class="hud-icon-btn" title="Timmy's SOUL">
<span class="hud-icon"></span>
<span class="hud-btn-label">SOUL</span>
</button>
<button id="mode-toggle-btn" class="hud-icon-btn mode-toggle" title="Toggle Mode">
<span class="hud-icon">👁</span>
<span class="hud-btn-label" id="mode-label">VISITOR</span>
</button>
<button id="atlas-toggle-btn" class="hud-icon-btn" title="Portal Atlas">
<span class="hud-icon">🌐</span>
<span class="hud-btn-label">WORLDS</span>
</button>
<div id="bannerlord-status" class="hud-status-item" title="Bannerlord Readiness">
<span class="status-dot"></span>
<span class="status-label">BANNERLORD</span>

View File

@@ -0,0 +1,451 @@
// ═══════════════════════════════════════════════════
// REASONING TRACE HUD COMPONENT
// ═══════════════════════════════════════════════════
//
// Displays a real-time trace of the agent's reasoning
// steps during complex task execution. Shows the chain
// of thought, decision points, and confidence levels.
//
// Usage:
// ReasoningTrace.init();
// ReasoningTrace.addStep(step);
// ReasoningTrace.clear();
// ReasoningTrace.toggle();
// ═══════════════════════════════════════════════════
const ReasoningTrace = (() => {
// ── State ─────────────────────────────────────────
let _container = null;
let _content = null;
let _header = null;
let _steps = [];
let _maxSteps = 20;
let _isVisible = true;
let _currentTask = null;
let _stepCounter = 0;
// ── Config ────────────────────────────────────────
const STEP_TYPES = {
THINK: { icon: '💭', color: '#4af0c0', label: 'THINK' },
DECIDE: { icon: '⚖️', color: '#ffd700', label: 'DECIDE' },
RECALL: { icon: '🔍', color: '#7b5cff', label: 'RECALL' },
PLAN: { icon: '📋', color: '#ff8c42', label: 'PLAN' },
EXECUTE: { icon: '⚡', color: '#ff4466', label: 'EXECUTE' },
VERIFY: { icon: '✅', color: '#4af0c0', label: 'VERIFY' },
DOUBT: { icon: '❓', color: '#ff8c42', label: 'DOUBT' },
MEMORY: { icon: '💾', color: '#7b5cff', label: 'MEMORY' }
};
// ── Helpers ───────────────────────────────────────
function _escapeHtml(s) {
return String(s)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;');
}
function _formatTimestamp(timestamp) {
const date = new Date(timestamp);
return date.toLocaleTimeString('en-US', {
hour12: false,
hour: '2-digit',
minute: '2-digit',
second: '2-digit'
});
}
function _getConfidenceBar(confidence) {
if (confidence === undefined || confidence === null) return '';
const percent = Math.max(0, Math.min(100, Math.round(confidence * 100)));
const bars = Math.round(percent / 10);
const filled = '█'.repeat(bars);
const empty = '░'.repeat(10 - bars);
return `<span class="confidence-bar" title="${percent}% confidence">${filled}${empty}</span>`;
}
// ── DOM Setup ─────────────────────────────────────
function _createDOM() {
// Create container if it doesn't exist
if (_container) return;
_container = document.createElement('div');
_container.id = 'reasoning-trace';
_container.className = 'hud-panel reasoning-trace';
_header = document.createElement('div');
_header.className = 'panel-header';
_header.innerHTML = `<span class="trace-icon">🧠</span> REASONING TRACE`;
// Task indicator
const taskIndicator = document.createElement('div');
taskIndicator.className = 'trace-task';
taskIndicator.id = 'trace-task';
taskIndicator.textContent = 'No active task';
// Step counter
const stepCounter = document.createElement('div');
stepCounter.className = 'trace-counter';
stepCounter.id = 'trace-counter';
stepCounter.textContent = '0 steps';
// Controls
const controls = document.createElement('div');
controls.className = 'trace-controls';
controls.innerHTML = `
<button class="trace-btn" id="trace-clear" title="Clear trace">🗑️</button>
<button class="trace-btn" id="trace-toggle" title="Toggle visibility">👁️</button>
<button class="trace-btn" id="trace-export" title="Export trace">📤</button>
`;
// Header container
const headerContainer = document.createElement('div');
headerContainer.className = 'trace-header-container';
headerContainer.appendChild(_header);
headerContainer.appendChild(controls);
// Content area
_content = document.createElement('div');
_content.className = 'panel-content trace-content';
_content.id = 'reasoning-trace-content';
// Assemble
_container.appendChild(headerContainer);
_container.appendChild(taskIndicator);
_container.appendChild(stepCounter);
_container.appendChild(_content);
// Add to HUD
const hud = document.getElementById('hud');
if (hud) {
const gofaiHud = hud.querySelector('.gofai-hud');
if (gofaiHud) {
gofaiHud.appendChild(_container);
} else {
hud.appendChild(_container);
}
}
// Add event listeners
document.getElementById('trace-clear')?.addEventListener('click', clear);
document.getElementById('trace-toggle')?.addEventListener('click', toggle);
document.getElementById('trace-export')?.addEventListener('click', exportTrace);
}
// ── Rendering ─────────────────────────────────────
function _renderStep(step, index) {
const typeConfig = STEP_TYPES[step.type] || STEP_TYPES.THINK;
const timestamp = _formatTimestamp(step.timestamp);
const confidence = _getConfidenceBar(step.confidence);
const stepEl = document.createElement('div');
stepEl.className = `trace-step trace-step-${step.type.toLowerCase()}`;
stepEl.dataset.stepId = step.id;
// Step header
const header = document.createElement('div');
header.className = 'trace-step-header';
header.innerHTML = `
<span class="step-icon">${typeConfig.icon}</span>
<span class="step-type" style="color: ${typeConfig.color}">${typeConfig.label}</span>
<span class="step-time">${timestamp}</span>
${confidence}
`;
// Step content
const content = document.createElement('div');
content.className = 'trace-step-content';
if (step.thought) {
const thought = document.createElement('div');
thought.className = 'step-thought';
thought.textContent = step.thought;
content.appendChild(thought);
}
if (step.reasoning) {
const reasoning = document.createElement('div');
reasoning.className = 'step-reasoning';
reasoning.textContent = step.reasoning;
content.appendChild(reasoning);
}
if (step.decision) {
const decision = document.createElement('div');
decision.className = 'step-decision';
decision.innerHTML = `<strong>Decision:</strong> ${_escapeHtml(step.decision)}`;
content.appendChild(decision);
}
if (step.alternatives && step.alternatives.length > 0) {
const alternatives = document.createElement('div');
alternatives.className = 'step-alternatives';
alternatives.innerHTML = `<strong>Alternatives:</strong> ${step.alternatives.map(a => _escapeHtml(a)).join(', ')}`;
content.appendChild(alternatives);
}
if (step.source) {
const source = document.createElement('div');
source.className = 'step-source';
source.innerHTML = `<strong>Source:</strong> ${_escapeHtml(step.source)}`;
content.appendChild(source);
}
stepEl.appendChild(header);
stepEl.appendChild(content);
return stepEl;
}
function _render() {
if (!_content) return;
// Clear content
_content.innerHTML = '';
// Update task indicator
const taskEl = document.getElementById('trace-task');
if (taskEl) {
taskEl.textContent = _currentTask || 'No active task';
taskEl.className = _currentTask ? 'trace-task active' : 'trace-task';
}
// Update step counter
const counterEl = document.getElementById('trace-counter');
if (counterEl) {
counterEl.textContent = `${_steps.length} step${_steps.length !== 1 ? 's' : ''}`;
}
// Render steps (newest first)
const sortedSteps = [..._steps].sort((a, b) => b.timestamp - a.timestamp);
for (let i = 0; i < sortedSteps.length; i++) {
const stepEl = _renderStep(sortedSteps[i], i);
_content.appendChild(stepEl);
// Add separator between steps
if (i < sortedSteps.length - 1) {
const separator = document.createElement('div');
separator.className = 'trace-separator';
_content.appendChild(separator);
}
}
// Show empty state if no steps
if (_steps.length === 0) {
const empty = document.createElement('div');
empty.className = 'trace-empty';
empty.innerHTML = `
<span class="empty-icon">💭</span>
<span class="empty-text">No reasoning steps yet</span>
<span class="empty-hint">Start a task to see the trace</span>
`;
_content.appendChild(empty);
}
}
// ── Public API ────────────────────────────────────
function init() {
_createDOM();
_render();
console.info('[ReasoningTrace] Initialized');
}
/**
* Add a reasoning step to the trace.
* @param {Object} step - The reasoning step
* @param {string} step.type - Step type (THINK, DECIDE, RECALL, PLAN, EXECUTE, VERIFY, DOUBT, MEMORY)
* @param {string} step.thought - The main thought/content
* @param {string} [step.reasoning] - Detailed reasoning
* @param {string} [step.decision] - Decision made
* @param {string[]} [step.alternatives] - Alternative options considered
* @param {string} [step.source] - Source of information
* @param {number} [step.confidence] - Confidence level (0-1)
* @param {string} [step.taskId] - Associated task ID
*/
function addStep(step) {
if (!step || !step.thought) return;
// Generate unique ID
const id = `step-${++_stepCounter}-${Date.now()}`;
// Create step object
const newStep = {
id,
timestamp: Date.now(),
type: step.type || 'THINK',
thought: step.thought,
reasoning: step.reasoning || null,
decision: step.decision || null,
alternatives: step.alternatives || null,
source: step.source || null,
confidence: step.confidence !== undefined ? Math.max(0, Math.min(1, step.confidence)) : null,
taskId: step.taskId || _currentTask
};
// Add to steps array
_steps.unshift(newStep);
// Limit number of steps
if (_steps.length > _maxSteps) {
_steps = _steps.slice(0, _maxSteps);
}
// Update task if provided
if (step.taskId && step.taskId !== _currentTask) {
setTask(step.taskId);
}
// Re-render
_render();
// Log to console for debugging
console.debug(`[ReasoningTrace] ${newStep.type}: ${newStep.thought}`);
return newStep.id;
}
/**
* Set the current task being traced.
* @param {string} taskId - Task identifier
*/
function setTask(taskId) {
_currentTask = taskId;
_render();
console.info(`[ReasoningTrace] Task set: ${taskId}`);
}
/**
* Clear all steps from the trace.
*/
function clear() {
_steps = [];
_stepCounter = 0;
_render();
console.info('[ReasoningTrace] Cleared');
}
/**
* Toggle the visibility of the trace panel.
*/
function toggle() {
_isVisible = !_isVisible;
if (_container) {
_container.style.display = _isVisible ? 'block' : 'none';
}
console.info(`[ReasoningTrace] Visibility: ${_isVisible ? 'shown' : 'hidden'}`);
}
/**
* Export the trace as JSON.
* @returns {string} JSON string of the trace
*/
function exportTrace() {
const exportData = {
task: _currentTask,
exportedAt: new Date().toISOString(),
steps: _steps.map(step => ({
type: step.type,
thought: step.thought,
reasoning: step.reasoning,
decision: step.decision,
alternatives: step.alternatives,
source: step.source,
confidence: step.confidence,
timestamp: new Date(step.timestamp).toISOString()
}))
};
const json = JSON.stringify(exportData, null, 2);
// Copy to clipboard
navigator.clipboard.writeText(json).then(() => {
console.info('[ReasoningTrace] Copied to clipboard');
// Show feedback
const btn = document.getElementById('trace-export');
if (btn) {
const original = btn.innerHTML;
btn.innerHTML = '✅';
setTimeout(() => { btn.innerHTML = original; }, 1000);
}
}).catch(err => {
console.error('[ReasoningTrace] Failed to copy:', err);
});
return json;
}
/**
* Get the current trace data.
* @returns {Object} Current trace state
*/
function getTrace() {
return {
task: _currentTask,
steps: [..._steps],
stepCount: _steps.length,
isVisible: _isVisible
};
}
/**
* Get steps filtered by type.
* @param {string} type - Step type to filter by
* @returns {Array} Filtered steps
*/
function getStepsByType(type) {
return _steps.filter(step => step.type === type);
}
/**
* Get steps for a specific task.
* @param {string} taskId - Task ID to filter by
* @returns {Array} Filtered steps
*/
function getStepsByTask(taskId) {
return _steps.filter(step => step.taskId === taskId);
}
/**
* Mark the current task as complete.
* @param {string} [result] - Optional result description
*/
function completeTask(result) {
if (_currentTask) {
addStep({
type: 'VERIFY',
thought: `Task completed: ${result || 'Success'}`,
taskId: _currentTask
});
// Clear current task after a delay
setTimeout(() => {
_currentTask = null;
_render();
}, 2000);
}
}
// ── Return Public API ─────────────────────────────
return {
init,
addStep,
setTask,
clear,
toggle,
exportTrace,
getTrace,
getStepsByType,
getStepsByTask,
completeTask,
STEP_TYPES
};
})();
export { ReasoningTrace };

249
style.css
View File

@@ -2685,3 +2685,252 @@ body.operator-mode #mode-label {
color: #ffd700;
}
/* ═══ REASONING TRACE COMPONENT ═══ */
.reasoning-trace {
width: 320px;
max-height: 400px;
display: flex;
flex-direction: column;
}
.trace-header-container {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 4px;
}
.trace-header-container .panel-header {
margin-bottom: 0;
border-bottom: none;
padding-bottom: 0;
}
.trace-icon {
margin-right: 4px;
}
.trace-controls {
display: flex;
gap: 4px;
}
.trace-btn {
background: rgba(74, 240, 192, 0.1);
border: 1px solid rgba(74, 240, 192, 0.2);
color: #4af0c0;
padding: 2px 6px;
font-size: 10px;
cursor: pointer;
border-radius: 2px;
transition: all 0.2s ease;
}
.trace-btn:hover {
background: rgba(74, 240, 192, 0.2);
border-color: #4af0c0;
}
.trace-task {
font-size: 9px;
color: #8899aa;
margin-bottom: 4px;
padding: 2px 6px;
background: rgba(0, 0, 0, 0.2);
border-radius: 2px;
font-family: 'JetBrains Mono', monospace;
}
.trace-task.active {
color: #4af0c0;
background: rgba(74, 240, 192, 0.1);
border-left: 2px solid #4af0c0;
}
.trace-counter {
font-size: 9px;
color: #667788;
margin-bottom: 6px;
font-family: 'JetBrains Mono', monospace;
}
.trace-content {
flex: 1;
overflow-y: auto;
max-height: 300px;
}
.trace-step {
margin-bottom: 8px;
padding: 6px;
background: rgba(0, 0, 0, 0.2);
border-radius: 3px;
border-left: 3px solid #4af0c0;
transition: all 0.2s ease;
}
.trace-step-think {
border-left-color: #4af0c0;
}
.trace-step-decide {
border-left-color: #ffd700;
}
.trace-step-recall {
border-left-color: #7b5cff;
}
.trace-step-plan {
border-left-color: #ff8c42;
}
.trace-step-execute {
border-left-color: #ff4466;
}
.trace-step-verify {
border-left-color: #4af0c0;
}
.trace-step-doubt {
border-left-color: #ff8c42;
}
.trace-step-memory {
border-left-color: #7b5cff;
}
.trace-step-header {
display: flex;
align-items: center;
gap: 6px;
margin-bottom: 4px;
font-size: 10px;
}
.step-icon {
font-size: 12px;
}
.step-type {
font-weight: 700;
letter-spacing: 0.5px;
font-family: 'JetBrains Mono', monospace;
}
.step-time {
color: #667788;
font-size: 9px;
margin-left: auto;
font-family: 'JetBrains Mono', monospace;
}
.confidence-bar {
font-family: 'JetBrains Mono', monospace;
font-size: 9px;
color: #4af0c0;
letter-spacing: -1px;
margin-left: 4px;
}
.trace-step-content {
font-size: 11px;
line-height: 1.4;
color: #d9f7ff;
}
.step-thought {
margin-bottom: 4px;
font-style: italic;
color: #e0f0ff;
}
.step-reasoning {
margin-bottom: 4px;
color: #aabbcc;
font-size: 10px;
padding-left: 8px;
border-left: 1px solid rgba(74, 240, 192, 0.2);
}
.step-decision {
margin-bottom: 4px;
color: #ffd700;
font-size: 10px;
}
.step-alternatives {
margin-bottom: 4px;
color: #8899aa;
font-size: 10px;
}
.step-source {
margin-bottom: 4px;
color: #7b5cff;
font-size: 10px;
}
.trace-separator {
height: 1px;
background: linear-gradient(90deg, transparent, rgba(74, 240, 192, 0.2), transparent);
margin: 6px 0;
}
.trace-empty {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 20px;
color: #667788;
text-align: center;
}
.empty-icon {
font-size: 24px;
margin-bottom: 8px;
opacity: 0.5;
}
.empty-text {
font-size: 11px;
margin-bottom: 4px;
font-family: 'JetBrains Mono', monospace;
}
.empty-hint {
font-size: 9px;
color: #445566;
font-family: 'JetBrains Mono', monospace;
}
/* Animation for new steps */
@keyframes trace-step-in {
from {
opacity: 0;
transform: translateY(-10px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.trace-step {
animation: trace-step-in 0.3s ease-out;
}
/* Responsive adjustments */
@media (max-width: 768px) {
.reasoning-trace {
width: 280px;
}
.trace-content {
max-height: 200px;
}
}

View File

@@ -1,377 +0,0 @@
"""
Tests for agent memory — cross-session agent memory via MemPalace.
Tests the memory module, hooks, and session mining without requiring
a live ChromaDB instance. Uses mocking for the MemPalace backend.
"""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from agent.memory import (
AgentMemory,
MemoryContext,
SessionTranscript,
create_agent_memory,
)
from agent.memory_hooks import MemoryHooks
# ---------------------------------------------------------------------------
# SessionTranscript tests
# ---------------------------------------------------------------------------
class TestSessionTranscript:
def test_create(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
assert t.agent_name == "test"
assert t.wing == "wing_test"
assert len(t.entries) == 0
def test_add_user_turn(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
t.add_user_turn("Hello")
assert len(t.entries) == 1
assert t.entries[0]["role"] == "user"
assert t.entries[0]["text"] == "Hello"
def test_add_agent_turn(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
t.add_agent_turn("Response")
assert t.entries[0]["role"] == "agent"
def test_add_tool_call(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
t.add_tool_call("shell", "ls", "file1 file2")
assert t.entries[0]["role"] == "tool"
assert t.entries[0]["tool"] == "shell"
def test_summary_empty(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
assert t.summary() == "Empty session."
def test_summary_with_entries(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
t.add_user_turn("Do something")
t.add_agent_turn("Done")
t.add_tool_call("shell", "ls", "ok")
summary = t.summary()
assert "USER: Do something" in summary
assert "AGENT: Done" in summary
assert "TOOL(shell): ok" in summary
def test_text_truncation(self):
t = SessionTranscript(agent_name="test", wing="wing_test")
long_text = "x" * 5000
t.add_user_turn(long_text)
assert len(t.entries[0]["text"]) == 2000
# ---------------------------------------------------------------------------
# MemoryContext tests
# ---------------------------------------------------------------------------
class TestMemoryContext:
def test_empty_context(self):
ctx = MemoryContext()
assert ctx.to_prompt_block() == ""
def test_unloaded_context(self):
ctx = MemoryContext()
ctx.loaded = False
assert ctx.to_prompt_block() == ""
def test_loaded_with_data(self):
ctx = MemoryContext()
ctx.loaded = True
ctx.recent_diaries = [
{"text": "Fixed PR #1386", "timestamp": "2026-04-13T10:00:00Z"}
]
ctx.facts = [
{"text": "Bezalel runs on VPS Beta", "score": 0.95}
]
ctx.relevant_memories = [
{"text": "Changed CI runner", "score": 0.87}
]
block = ctx.to_prompt_block()
assert "Recent Session Summaries" in block
assert "Fixed PR #1386" in block
assert "Known Facts" in block
assert "Bezalel runs on VPS Beta" in block
assert "Relevant Past Memories" in block
def test_loaded_empty(self):
ctx = MemoryContext()
ctx.loaded = True
# No data — should return empty string
assert ctx.to_prompt_block() == ""
# ---------------------------------------------------------------------------
# AgentMemory tests (with mocked MemPalace)
# ---------------------------------------------------------------------------
class TestAgentMemory:
def test_create(self):
mem = AgentMemory(agent_name="bezalel")
assert mem.agent_name == "bezalel"
assert mem.wing == "wing_bezalel"
def test_custom_wing(self):
mem = AgentMemory(agent_name="bezalel", wing="custom_wing")
assert mem.wing == "custom_wing"
def test_factory(self):
mem = create_agent_memory("ezra")
assert mem.agent_name == "ezra"
assert mem.wing == "wing_ezra"
def test_unavailable_graceful(self):
"""Test graceful degradation when MemPalace is unavailable."""
mem = AgentMemory(agent_name="test")
mem._available = False # Force unavailable
# Should not raise
ctx = mem.recall_context("test query")
assert ctx.loaded is False
assert ctx.error == "MemPalace unavailable"
# remember returns None
assert mem.remember("test") is None
# search returns empty
assert mem.search("test") == []
def test_start_end_session(self):
mem = AgentMemory(agent_name="test")
mem._available = False
transcript = mem.start_session()
assert isinstance(transcript, SessionTranscript)
assert mem._transcript is not None
doc_id = mem.end_session()
assert mem._transcript is None
def test_remember_graceful_when_unavailable(self):
"""Test remember returns None when MemPalace is unavailable."""
mem = AgentMemory(agent_name="test")
mem._available = False
doc_id = mem.remember("some important fact")
assert doc_id is None
def test_write_diary_from_transcript(self):
mem = AgentMemory(agent_name="test")
mem._available = False
transcript = mem.start_session()
transcript.add_user_turn("Hello")
transcript.add_agent_turn("Hi there")
# Write diary should handle unavailable gracefully
doc_id = mem.write_diary()
assert doc_id is None # MemPalace unavailable
# ---------------------------------------------------------------------------
# MemoryHooks tests
# ---------------------------------------------------------------------------
class TestMemoryHooks:
def test_create(self):
hooks = MemoryHooks(agent_name="bezalel")
assert hooks.agent_name == "bezalel"
assert hooks.is_active is False
def test_session_lifecycle(self):
hooks = MemoryHooks(agent_name="test")
# Force memory unavailable
hooks._memory = AgentMemory(agent_name="test")
hooks._memory._available = False
# Start session
block = hooks.on_session_start()
assert hooks.is_active is True
assert block == "" # No memory available
# Record turns
hooks.on_user_turn("Hello")
hooks.on_agent_turn("Hi")
hooks.on_tool_call("shell", "ls", "ok")
# Record decision
hooks.on_important_decision("Switched to self-hosted CI")
# End session
doc_id = hooks.on_session_end()
assert hooks.is_active is False
def test_hooks_before_session(self):
"""Hooks before session start should be no-ops."""
hooks = MemoryHooks(agent_name="test")
hooks._memory = AgentMemory(agent_name="test")
hooks._memory._available = False
# Should not raise
hooks.on_user_turn("Hello")
hooks.on_agent_turn("Response")
def test_hooks_after_session_end(self):
"""Hooks after session end should be no-ops."""
hooks = MemoryHooks(agent_name="test")
hooks._memory = AgentMemory(agent_name="test")
hooks._memory._available = False
hooks.on_session_start()
hooks.on_session_end()
# Should not raise
hooks.on_user_turn("Late message")
doc_id = hooks.on_session_end()
assert doc_id is None
def test_search_during_session(self):
hooks = MemoryHooks(agent_name="test")
hooks._memory = AgentMemory(agent_name="test")
hooks._memory._available = False
results = hooks.search("some query")
assert results == []
# ---------------------------------------------------------------------------
# Session mining tests
# ---------------------------------------------------------------------------
class TestSessionMining:
def test_parse_session_file(self):
from bin.memory_mine import parse_session_file
with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f:
f.write('{"role": "user", "content": "Hello"}\n')
f.write('{"role": "assistant", "content": "Hi there"}\n')
f.write('{"role": "tool", "name": "shell", "content": "ls output"}\n')
f.write("\n") # blank line
f.write("not json\n") # malformed
path = Path(f.name)
turns = parse_session_file(path)
assert len(turns) == 3
assert turns[0]["role"] == "user"
assert turns[1]["role"] == "assistant"
assert turns[2]["role"] == "tool"
path.unlink()
def test_summarize_session(self):
from bin.memory_mine import summarize_session
turns = [
{"role": "user", "content": "Check CI"},
{"role": "assistant", "content": "Running CI check..."},
{"role": "tool", "name": "shell", "content": "5 tests passed"},
{"role": "assistant", "content": "CI is healthy"},
]
summary = summarize_session(turns, "bezalel")
assert "bezalel" in summary
assert "Check CI" in summary
assert "shell" in summary
def test_summarize_empty(self):
from bin.memory_mine import summarize_session
assert summarize_session([], "test") == "Empty session."
def test_find_session_files(self, tmp_path):
from bin.memory_mine import find_session_files
# Create some test files
(tmp_path / "session1.jsonl").write_text("{}\n")
(tmp_path / "session2.jsonl").write_text("{}\n")
(tmp_path / "notes.txt").write_text("not a session")
files = find_session_files(tmp_path, days=365)
assert len(files) == 2
assert all(f.suffix == ".jsonl" for f in files)
def test_find_session_files_missing_dir(self):
from bin.memory_mine import find_session_files
files = find_session_files(Path("/nonexistent/path"), days=7)
assert files == []
def test_mine_session_dry_run(self, tmp_path):
from bin.memory_mine import mine_session
session_file = tmp_path / "test.jsonl"
session_file.write_text(
'{"role": "user", "content": "Hello"}\n'
'{"role": "assistant", "content": "Hi"}\n'
)
result = mine_session(session_file, wing="wing_test", dry_run=True)
assert result is None # dry run doesn't store
def test_mine_session_empty_file(self, tmp_path):
from bin.memory_mine import mine_session
session_file = tmp_path / "empty.jsonl"
session_file.write_text("")
result = mine_session(session_file, wing="wing_test")
assert result is None
# ---------------------------------------------------------------------------
# Integration test — full lifecycle
# ---------------------------------------------------------------------------
class TestFullLifecycle:
"""Test the full session lifecycle without a real MemPalace backend."""
def test_full_session_flow(self):
hooks = MemoryHooks(agent_name="bezalel")
# Force memory unavailable
hooks._memory = AgentMemory(agent_name="bezalel")
hooks._memory._available = False
# 1. Session start
context_block = hooks.on_session_start("What CI issues do I have?")
assert isinstance(context_block, str)
# 2. User asks question
hooks.on_user_turn("Check CI pipeline health")
# 3. Agent uses tool
hooks.on_tool_call("shell", "pytest tests/", "12 passed")
# 4. Agent responds
hooks.on_agent_turn("CI pipeline is healthy. All 12 tests passing.")
# 5. Important decision
hooks.on_important_decision("Decided to keep current CI runner", room="forge")
# 6. More interaction
hooks.on_user_turn("Good, check memory integration next")
hooks.on_agent_turn("Will test agent.memory module")
# 7. Session end
doc_id = hooks.on_session_end()
assert hooks.is_active is False