diff --git a/src/config.py b/src/config.py
index 4de9ad46..6c8e6c87 100644
--- a/src/config.py
+++ b/src/config.py
@@ -311,6 +311,14 @@ class Settings(BaseSettings):
thinking_memory_check_every: int = 50 # check memory status every Nth thought
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
+ # ── Dreaming Mode ─────────────────────────────────────────────────
+ # When enabled, the agent replays past sessions during idle time to
+ # simulate alternative actions and propose behavioural rules.
+ dreaming_enabled: bool = True
+ dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
+ dreaming_cycle_seconds: int = 600 # seconds between dream attempts
+ dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
+
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# These values are passed as env vars to the gitea-mcp server process.
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index 3ee815f9..fc141c1a 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -58,6 +58,7 @@ from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
+from dashboard.routes.dreaming import router as dreaming_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import matrix_router
from dashboard.routes.world import router as world_router
@@ -250,6 +251,36 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
+async def _dreaming_scheduler() -> None:
+ """Background task: run idle-time dreaming cycles.
+
+ When the system has been idle for ``dreaming_idle_threshold_minutes``,
+ the dreaming engine replays a past session and simulates alternatives.
+ """
+ from timmy.dreaming import dreaming_engine
+
+ await asyncio.sleep(15) # Stagger after loop QA scheduler
+
+ while True:
+ try:
+ if settings.dreaming_enabled:
+ await asyncio.wait_for(
+ dreaming_engine.dream_once(),
+ timeout=settings.dreaming_timeout_seconds + 10,
+ )
+ except TimeoutError:
+ logger.warning(
+ "Dreaming cycle timed out after %ds",
+ settings.dreaming_timeout_seconds,
+ )
+ except asyncio.CancelledError:
+ raise
+ except Exception as exc:
+ logger.error("Dreaming scheduler error: %s", exc)
+
+ await asyncio.sleep(settings.dreaming_cycle_seconds)
+
+
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -410,6 +441,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
+ asyncio.create_task(_dreaming_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
@@ -678,6 +710,7 @@ app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
app.include_router(three_strike_router)
+app.include_router(dreaming_router)
@app.websocket("/ws")
diff --git a/src/dashboard/routes/dreaming.py b/src/dashboard/routes/dreaming.py
new file mode 100644
index 00000000..77b49ea5
--- /dev/null
+++ b/src/dashboard/routes/dreaming.py
@@ -0,0 +1,84 @@
+"""Dreaming mode dashboard routes.
+
+GET /dreaming/api/status — JSON status of the dreaming engine
+GET /dreaming/api/recent — JSON list of recent dream records
+POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
+GET /dreaming/partial — HTMX partial: dreaming status panel
+"""
+
+import logging
+
+from fastapi import APIRouter, Request
+from fastapi.responses import HTMLResponse, JSONResponse
+
+from dashboard.templating import templates
+from timmy.dreaming import dreaming_engine
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/dreaming", tags=["dreaming"])
+
+
+@router.get("/api/status", response_class=JSONResponse)
+async def dreaming_status():
+ """Return current dreaming engine status as JSON."""
+ return dreaming_engine.get_status()
+
+
+@router.get("/api/recent", response_class=JSONResponse)
+async def dreaming_recent(limit: int = 10):
+ """Return recent dream records as JSON."""
+ dreams = dreaming_engine.get_recent_dreams(limit=limit)
+ return [
+ {
+ "id": d.id,
+ "session_excerpt": d.session_excerpt[:200],
+ "decision_point": d.decision_point[:200],
+ "simulation": d.simulation,
+ "proposed_rule": d.proposed_rule,
+ "created_at": d.created_at,
+ }
+ for d in dreams
+ ]
+
+
+@router.post("/api/trigger", response_class=JSONResponse)
+async def dreaming_trigger():
+ """Manually trigger a dream cycle (bypasses idle check).
+
+ Useful for testing and manual inspection. Forces idle state temporarily.
+ """
+ from datetime import UTC, datetime, timedelta
+ from config import settings
+
+ # Temporarily back-date last activity to appear idle
+ original_time = dreaming_engine._last_activity_time
+ dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
+ minutes=settings.dreaming_idle_threshold_minutes + 1
+ )
+
+ try:
+ dream = await dreaming_engine.dream_once()
+ finally:
+ dreaming_engine._last_activity_time = original_time
+
+ if dream:
+ return {
+ "status": "ok",
+ "dream_id": dream.id,
+ "proposed_rule": dream.proposed_rule,
+ "simulation": dream.simulation[:200],
+ }
+ return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
+
+
+@router.get("/partial", response_class=HTMLResponse)
+async def dreaming_partial(request: Request):
+ """HTMX partial: dreaming status panel for the dashboard."""
+ status = dreaming_engine.get_status()
+ recent = dreaming_engine.get_recent_dreams(limit=5)
+ return templates.TemplateResponse(
+ request,
+ "partials/dreaming_status.html",
+ {"status": status, "recent_dreams": recent},
+ )
diff --git a/src/dashboard/templates/partials/dreaming_status.html b/src/dashboard/templates/partials/dreaming_status.html
new file mode 100644
index 00000000..7a0d0f38
--- /dev/null
+++ b/src/dashboard/templates/partials/dreaming_status.html
@@ -0,0 +1,32 @@
+{% if not status.enabled %}
+
Dreaming mode disabled
+{% elif status.dreaming %}
+
+
+
DREAMING
+
{{ status.current_summary }}
+
+{% elif status.idle %}
+
+
+ IDLE
+ {{ status.idle_minutes }}m — dream cycle pending
+
+{% else %}
+
+
+ STANDBY
+ idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m
+
+{% endif %}
+
+{% if recent_dreams %}
+
+ {% for d in recent_dreams %}
+
+
{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}
+
{{ d.created_at[:16] | replace("T", " ") }}
+
+ {% endfor %}
+
+{% endif %}
diff --git a/src/timmy/dreaming.py b/src/timmy/dreaming.py
new file mode 100644
index 00000000..524a5713
--- /dev/null
+++ b/src/timmy/dreaming.py
@@ -0,0 +1,435 @@
+"""Dreaming Mode — idle-time session replay and counterfactual simulation.
+
+When the dashboard has been idle for a configurable period, this engine
+selects a past chat session, identifies key agent response points, and
+asks the LLM to simulate alternative approaches. Insights are stored as
+proposed rules that can feed the auto-crystallizer or memory system.
+
+Usage::
+
+ from timmy.dreaming import dreaming_engine
+
+ # Run one dream cycle (called by the background scheduler)
+ await dreaming_engine.dream_once()
+
+ # Query recent dreams
+ dreams = dreaming_engine.get_recent_dreams(limit=10)
+
+ # Get current status dict for API/dashboard
+ status = dreaming_engine.get_status()
+"""
+
+import json
+import logging
+import re
+import sqlite3
+import uuid
+from collections.abc import Generator
+from contextlib import closing, contextmanager
+from dataclasses import dataclass
+from datetime import UTC, datetime, timedelta
+from pathlib import Path
+from typing import Any
+
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_DB = Path("data/dreams.db")
+
+# Strip tags from reasoning model output
+_THINK_TAG_RE = re.compile(r".*?\s*", re.DOTALL)
+
+# Minimum messages in a session to be worth replaying
+_MIN_SESSION_MESSAGES = 3
+
+# Gap in seconds between messages that signals a new session
+_SESSION_GAP_SECONDS = 1800 # 30 minutes
+
+
+@dataclass
+class DreamRecord:
+ """A single completed dream cycle."""
+
+ id: str
+ session_excerpt: str # Short excerpt from the replayed session
+ decision_point: str # The agent message that was re-simulated
+ simulation: str # The alternative response generated
+ proposed_rule: str # Rule extracted from the simulation
+ created_at: str
+
+
+@contextmanager
+def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
+ db_path.parent.mkdir(parents=True, exist_ok=True)
+ with closing(sqlite3.connect(str(db_path))) as conn:
+ conn.row_factory = sqlite3.Row
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS dreams (
+ id TEXT PRIMARY KEY,
+ session_excerpt TEXT NOT NULL,
+ decision_point TEXT NOT NULL,
+ simulation TEXT NOT NULL,
+ proposed_rule TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL
+ )
+ """)
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
+ conn.commit()
+ yield conn
+
+
+def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
+ return DreamRecord(
+ id=row["id"],
+ session_excerpt=row["session_excerpt"],
+ decision_point=row["decision_point"],
+ simulation=row["simulation"],
+ proposed_rule=row["proposed_rule"],
+ created_at=row["created_at"],
+ )
+
+
+class DreamingEngine:
+ """Idle-time dreaming engine — replays sessions and simulates alternatives."""
+
+ def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
+ self._db_path = db_path
+ self._last_activity_time: datetime = datetime.now(UTC)
+ self._is_dreaming: bool = False
+ self._current_dream_summary: str = ""
+ self._dreaming_agent = None # Lazy-initialised
+
+ # ── Public API ────────────────────────────────────────────────────────
+
+ def record_activity(self) -> None:
+ """Reset the idle timer — call this on every user/agent interaction."""
+ self._last_activity_time = datetime.now(UTC)
+
+ def is_idle(self) -> bool:
+ """Return True if the system has been idle long enough to start dreaming."""
+ threshold = settings.dreaming_idle_threshold_minutes
+ if threshold <= 0:
+ return False
+ return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
+
+ def get_status(self) -> dict[str, Any]:
+ """Return a status dict suitable for API/dashboard consumption."""
+ return {
+ "enabled": settings.dreaming_enabled,
+ "dreaming": self._is_dreaming,
+ "idle": self.is_idle(),
+ "current_summary": self._current_dream_summary,
+ "idle_minutes": int(
+ (datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
+ ),
+ "idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
+ "dream_count": self.count_dreams(),
+ }
+
+ async def dream_once(self) -> DreamRecord | None:
+ """Execute one dream cycle.
+
+ Returns the stored DreamRecord, or None if the cycle was skipped
+ (not idle, dreaming disabled, no suitable session, or LLM error).
+ """
+ if not settings.dreaming_enabled:
+ return None
+
+ if not self.is_idle():
+ logger.debug(
+ "Dreaming skipped — system active (idle for %d min, threshold %d min)",
+ int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
+ settings.dreaming_idle_threshold_minutes,
+ )
+ return None
+
+ if self._is_dreaming:
+ logger.debug("Dreaming skipped — cycle already in progress")
+ return None
+
+ self._is_dreaming = True
+ self._current_dream_summary = "Selecting a past session…"
+ await self._broadcast_status()
+
+ try:
+ return await self._run_dream_cycle()
+ except Exception as exc:
+ logger.warning("Dream cycle failed: %s", exc)
+ return None
+ finally:
+ self._is_dreaming = False
+ self._current_dream_summary = ""
+ await self._broadcast_status()
+
+ def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
+ """Retrieve the most recent dream records."""
+ with _get_conn(self._db_path) as conn:
+ rows = conn.execute(
+ "SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
+ (limit,),
+ ).fetchall()
+ return [_row_to_dream(r) for r in rows]
+
+ def count_dreams(self) -> int:
+ """Return total number of stored dream records."""
+ with _get_conn(self._db_path) as conn:
+ row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
+ return row["c"] if row else 0
+
+ # ── Private helpers ───────────────────────────────────────────────────
+
+ async def _run_dream_cycle(self) -> DreamRecord | None:
+ """Core dream logic: select → simulate → store."""
+ # 1. Select a past session from the chat log
+ session = await self._select_session()
+ if not session:
+ logger.debug("No suitable chat session found for dreaming")
+ self._current_dream_summary = "No past sessions to replay"
+ return None
+
+ decision_point, session_excerpt = session
+
+ self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}…"
+ await self._broadcast_status()
+
+ # 2. Simulate an alternative response
+ simulation = await self._simulate_alternative(decision_point, session_excerpt)
+ if not simulation:
+ logger.debug("Dream simulation produced no output")
+ return None
+
+ # 3. Extract a proposed rule
+ proposed_rule = await self._extract_rule(decision_point, simulation)
+
+ # 4. Store and broadcast
+ dream = self._store_dream(
+ session_excerpt=session_excerpt,
+ decision_point=decision_point,
+ simulation=simulation,
+ proposed_rule=proposed_rule,
+ )
+
+ self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
+
+ logger.info(
+ "Dream [%s]: replayed session, proposed rule: %s",
+ dream.id[:8],
+ proposed_rule[:80] if proposed_rule else "(none)",
+ )
+
+ await self._broadcast_status()
+ await self._broadcast_dream(dream)
+ return dream
+
+ async def _select_session(self) -> tuple[str, str] | None:
+ """Select a past chat session and return (decision_point, session_excerpt).
+
+ Uses the SQLite chat store. Groups messages into sessions by time
+ gap. Picks a random session with enough messages, then selects one
+ agent response as the decision point.
+ """
+ try:
+ from infrastructure.chat_store import DB_PATH
+
+ if not DB_PATH.exists():
+ return None
+
+ import asyncio
+ rows = await asyncio.to_thread(self._load_chat_rows)
+ if not rows:
+ return None
+
+ sessions = self._group_into_sessions(rows)
+ if not sessions:
+ return None
+
+ # Filter sessions with enough messages
+ valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
+ if not valid:
+ return None
+
+ import random
+ session = random.choice(valid) # noqa: S311 (not cryptographic)
+
+ # Build a short text excerpt (last N messages)
+ excerpt_msgs = session[-6:]
+ excerpt = "\n".join(
+ f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
+ )
+
+ # Find agent responses as candidate decision points
+ agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
+ if not agent_msgs:
+ return None
+
+ decision = random.choice(agent_msgs) # noqa: S311
+ return decision["content"], excerpt
+
+ except Exception as exc:
+ logger.warning("Session selection failed: %s", exc)
+ return None
+
+ def _load_chat_rows(self) -> list[dict]:
+ """Synchronously load chat messages from SQLite."""
+ from infrastructure.chat_store import DB_PATH
+
+ with closing(sqlite3.connect(str(DB_PATH))) as conn:
+ conn.row_factory = sqlite3.Row
+ rows = conn.execute(
+ "SELECT role, content, timestamp FROM chat_messages "
+ "ORDER BY timestamp ASC"
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+ def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
+ """Group chat rows into sessions based on time gaps."""
+ if not rows:
+ return []
+
+ sessions: list[list[dict]] = []
+ current: list[dict] = [rows[0]]
+
+ for prev, curr in zip(rows, rows[1:]):
+ try:
+ t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
+ t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
+ gap = (t_curr - t_prev).total_seconds()
+ except Exception:
+ gap = 0
+
+ if gap > _SESSION_GAP_SECONDS:
+ sessions.append(current)
+ current = [curr]
+ else:
+ current.append(curr)
+
+ sessions.append(current)
+ return sessions
+
+ async def _simulate_alternative(
+ self, decision_point: str, session_excerpt: str
+ ) -> str:
+ """Ask the LLM to simulate an alternative response."""
+ prompt = (
+ "You are Timmy, a sovereign AI agent in a dreaming state.\n"
+ "You are replaying a past conversation and exploring what you could "
+ "have done differently at a key decision point.\n\n"
+ "PAST SESSION EXCERPT:\n"
+ f"{session_excerpt}\n\n"
+ "KEY DECISION POINT (your past response):\n"
+ f"{decision_point[:500]}\n\n"
+ "TASK: In 2-3 sentences, describe ONE concrete alternative approach "
+ "you could have taken at this decision point that would have been "
+ "more helpful, more accurate, or more efficient.\n"
+ "Be specific — reference the actual content of the conversation.\n"
+ "Do NOT include meta-commentary about dreaming or this exercise.\n\n"
+ "Alternative approach:"
+ )
+
+ raw = await self._call_agent(prompt)
+ return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
+
+ async def _extract_rule(self, decision_point: str, simulation: str) -> str:
+ """Extract a proposed behaviour rule from the simulation."""
+ prompt = (
+ "Given this pair of agent responses:\n\n"
+ f"ORIGINAL: {decision_point[:300]}\n\n"
+ f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
+ "Extract ONE concise rule (max 20 words) that captures what to do "
+ "differently next time. Format: 'When X, do Y instead of Z.'\n"
+ "Rule:"
+ )
+
+ raw = await self._call_agent(prompt)
+ rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
+ # Keep only the first sentence/line
+ rule = rule.split("\n")[0].strip().rstrip(".")
+ return rule[:200] # Safety cap
+
+ async def _call_agent(self, prompt: str) -> str:
+ """Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
+ import asyncio
+
+ if self._dreaming_agent is None:
+ from timmy.agent import create_timmy
+
+ self._dreaming_agent = create_timmy(skip_mcp=True)
+
+ try:
+ async with asyncio.timeout(settings.dreaming_timeout_seconds):
+ run = await self._dreaming_agent.arun(prompt, stream=False)
+ except TimeoutError:
+ logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
+ return ""
+ except Exception as exc:
+ logger.warning("Dreaming LLM call failed: %s", exc)
+ return ""
+
+ raw = run.content if hasattr(run, "content") else str(run)
+ return raw or ""
+
+ def _store_dream(
+ self,
+ *,
+ session_excerpt: str,
+ decision_point: str,
+ simulation: str,
+ proposed_rule: str,
+ ) -> DreamRecord:
+ dream = DreamRecord(
+ id=str(uuid.uuid4()),
+ session_excerpt=session_excerpt,
+ decision_point=decision_point,
+ simulation=simulation,
+ proposed_rule=proposed_rule,
+ created_at=datetime.now(UTC).isoformat(),
+ )
+ with _get_conn(self._db_path) as conn:
+ conn.execute(
+ """
+ INSERT INTO dreams
+ (id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """,
+ (
+ dream.id,
+ dream.session_excerpt,
+ dream.decision_point,
+ dream.simulation,
+ dream.proposed_rule,
+ dream.created_at,
+ ),
+ )
+ conn.commit()
+ return dream
+
+ async def _broadcast_status(self) -> None:
+ """Push current dreaming status via WebSocket."""
+ try:
+ from infrastructure.ws_manager.handler import ws_manager
+
+ await ws_manager.broadcast("dreaming_state", self.get_status())
+ except Exception as exc:
+ logger.debug("Dreaming status broadcast failed: %s", exc)
+
+ async def _broadcast_dream(self, dream: DreamRecord) -> None:
+ """Push a completed dream record via WebSocket."""
+ try:
+ from infrastructure.ws_manager.handler import ws_manager
+
+ await ws_manager.broadcast(
+ "dreaming_complete",
+ {
+ "id": dream.id,
+ "proposed_rule": dream.proposed_rule,
+ "simulation": dream.simulation[:200],
+ "created_at": dream.created_at,
+ },
+ )
+ except Exception as exc:
+ logger.debug("Dreaming complete broadcast failed: %s", exc)
+
+
+# Module-level singleton
+dreaming_engine = DreamingEngine()
diff --git a/static/css/mission-control.css b/static/css/mission-control.css
index fc333da0..72866664 100644
--- a/static/css/mission-control.css
+++ b/static/css/mission-control.css
@@ -2549,6 +2549,7 @@
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
+
/* ── Voice settings ───────────────────────────────────────── */
.voice-settings-page { max-width: 600px; margin: 0 auto; }
@@ -2714,3 +2715,45 @@
padding: 0.3rem 0.6rem;
margin-bottom: 0.5rem;
}
+
+
+/* ═══════════════════════════════════════════════════════════════
+ Dreaming Mode
+ ═══════════════════════════════════════════════════════════════ */
+
+.dream-active {
+ display: flex; align-items: center; gap: 8px;
+ padding: 6px 0;
+}
+.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
+.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
+
+.dream-pulse {
+ display: inline-block; width: 8px; height: 8px; border-radius: 50%;
+ background: var(--purple);
+ animation: dream-pulse 1.8s ease-in-out infinite;
+}
+@keyframes dream-pulse {
+ 0%, 100% { opacity: 1; transform: scale(1); }
+ 50% { opacity: 0.4; transform: scale(0.7); }
+}
+
+.dream-dot {
+ display: inline-block; width: 7px; height: 7px; border-radius: 50%;
+}
+.dream-dot-idle { background: var(--amber); }
+.dream-dot-standby { background: var(--text-dim); }
+
+.dream-idle, .dream-standby {
+ display: flex; align-items: center; gap: 6px; padding: 4px 0;
+}
+.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
+.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
+.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
+
+.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
+.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
+.dream-record:last-child { border-bottom: none; }
+.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
+.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
+
diff --git a/tests/unit/test_dreaming.py b/tests/unit/test_dreaming.py
new file mode 100644
index 00000000..c47ec42b
--- /dev/null
+++ b/tests/unit/test_dreaming.py
@@ -0,0 +1,217 @@
+"""Unit tests for the Dreaming mode engine."""
+
+import sqlite3
+from contextlib import closing
+from datetime import UTC, datetime, timedelta
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from timmy.dreaming import DreamingEngine, DreamRecord, _SESSION_GAP_SECONDS
+
+
+# ── Fixtures ──────────────────────────────────────────────────────────────────
+
+
+@pytest.fixture()
+def tmp_dreams_db(tmp_path):
+ """Return a temporary path for the dreams database."""
+ return tmp_path / "dreams.db"
+
+
+@pytest.fixture()
+def engine(tmp_dreams_db):
+ """DreamingEngine backed by a temp database."""
+ return DreamingEngine(db_path=tmp_dreams_db)
+
+
+@pytest.fixture()
+def chat_db(tmp_path):
+ """Create a minimal chat database with some messages."""
+ db_path = tmp_path / "chat.db"
+ with closing(sqlite3.connect(str(db_path))) as conn:
+ conn.execute("""
+ CREATE TABLE chat_messages (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ role TEXT NOT NULL,
+ content TEXT NOT NULL,
+ timestamp TEXT NOT NULL,
+ source TEXT NOT NULL DEFAULT 'browser'
+ )
+ """)
+ now = datetime.now(UTC)
+ messages = [
+ ("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
+ ("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
+ ("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
+ ("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
+ ("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
+ ]
+ conn.executemany(
+ "INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
+ messages,
+ )
+ conn.commit()
+ return db_path
+
+
+# ── Idle detection ─────────────────────────────────────────────────────────────
+
+
+class TestIdleDetection:
+ def test_not_idle_immediately(self, engine):
+ assert engine.is_idle() is False
+
+ def test_idle_after_threshold(self, engine):
+ engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_idle_threshold_minutes = 10
+ assert engine.is_idle() is True
+
+ def test_not_idle_when_threshold_zero(self, engine):
+ engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_idle_threshold_minutes = 0
+ assert engine.is_idle() is False
+
+ def test_record_activity_resets_timer(self, engine):
+ engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
+ engine.record_activity()
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_idle_threshold_minutes = 10
+ assert engine.is_idle() is False
+
+
+# ── Status dict ───────────────────────────────────────────────────────────────
+
+
+class TestGetStatus:
+ def test_status_shape(self, engine):
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_enabled = True
+ mock_settings.dreaming_idle_threshold_minutes = 10
+ status = engine.get_status()
+ assert "enabled" in status
+ assert "dreaming" in status
+ assert "idle" in status
+ assert "dream_count" in status
+ assert "idle_minutes" in status
+
+ def test_dream_count_starts_at_zero(self, engine):
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_enabled = True
+ mock_settings.dreaming_idle_threshold_minutes = 10
+ assert engine.get_status()["dream_count"] == 0
+
+
+# ── Session grouping ──────────────────────────────────────────────────────────
+
+
+class TestGroupIntoSessions:
+ def test_single_session(self, engine):
+ now = datetime.now(UTC)
+ rows = [
+ {"role": "user", "content": "hi", "timestamp": now.isoformat()},
+ {"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
+ ]
+ sessions = engine._group_into_sessions(rows)
+ assert len(sessions) == 1
+ assert len(sessions[0]) == 2
+
+ def test_splits_on_large_gap(self, engine):
+ now = datetime.now(UTC)
+ gap = _SESSION_GAP_SECONDS + 100
+ rows = [
+ {"role": "user", "content": "hi", "timestamp": now.isoformat()},
+ {"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
+ ]
+ sessions = engine._group_into_sessions(rows)
+ assert len(sessions) == 2
+
+ def test_empty_input(self, engine):
+ assert engine._group_into_sessions([]) == []
+
+
+# ── Dream storage ─────────────────────────────────────────────────────────────
+
+
+class TestDreamStorage:
+ def test_store_and_retrieve(self, engine):
+ dream = engine._store_dream(
+ session_excerpt="User asked about Python.",
+ decision_point="Python uses try/except blocks.",
+ simulation="I could have given a code example.",
+ proposed_rule="When explaining errors, include a code snippet.",
+ )
+ assert dream.id
+ assert dream.proposed_rule == "When explaining errors, include a code snippet."
+
+ retrieved = engine.get_recent_dreams(limit=1)
+ assert len(retrieved) == 1
+ assert retrieved[0].id == dream.id
+
+ def test_count_increments(self, engine):
+ assert engine.count_dreams() == 0
+ engine._store_dream(
+ session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
+ )
+ assert engine.count_dreams() == 1
+
+
+# ── dream_once integration ─────────────────────────────────────────────────────
+
+
+class TestDreamOnce:
+ @pytest.mark.asyncio
+ async def test_skips_when_disabled(self, engine):
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_enabled = False
+ result = await engine.dream_once()
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_skips_when_not_idle(self, engine):
+ engine._last_activity_time = datetime.now(UTC)
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_enabled = True
+ mock_settings.dreaming_idle_threshold_minutes = 60
+ result = await engine.dream_once()
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_skips_when_already_dreaming(self, engine):
+ engine._is_dreaming = True
+ with patch("timmy.dreaming.settings") as mock_settings:
+ mock_settings.dreaming_enabled = True
+ mock_settings.dreaming_idle_threshold_minutes = 0
+ result = await engine.dream_once()
+ # Reset for cleanliness
+ engine._is_dreaming = False
+ assert result is None
+
+ @pytest.mark.asyncio
+ async def test_dream_produces_record_when_idle(self, engine, chat_db):
+ """Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
+ engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
+
+ with (
+ patch("timmy.dreaming.settings") as mock_settings,
+ patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
+ patch("infrastructure.chat_store.DB_PATH", chat_db),
+ ):
+ mock_settings.dreaming_enabled = True
+ mock_settings.dreaming_idle_threshold_minutes = 10
+ mock_settings.dreaming_timeout_seconds = 30
+ mock_agent.side_effect = [
+ "I could have provided a concrete try/except example.", # simulation
+ "When explaining errors, always include a runnable code snippet.", # rule
+ ]
+
+ result = await engine.dream_once()
+
+ assert result is not None
+ assert isinstance(result, DreamRecord)
+ assert result.simulation
+ assert result.proposed_rule
+ assert engine.count_dreams() == 1