1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
30376d1bb4 feat: Agent Dreaming Mode — idle background reflection loop (#1019)
- Add src/timmy/dreaming.py with DreamingEngine: rule synthesis from
  session logs during idle periods with configurable wake/sleep thresholds
- Add src/dashboard/routes/dreaming.py with REST endpoints for
  dreaming status, history, and manual trigger
- Add dreaming_status.html partial template for HTMX polling
- Wire dreaming_router into dashboard app
- Add dreaming CSS styles to mission-control.css
- Add dreaming_enabled/dreaming_idle_threshold_s config settings
- 17 unit tests all passing

Fixes #1019

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 18:56:21 -04:00
10 changed files with 852 additions and 921 deletions

View File

@@ -300,6 +300,14 @@ class Settings(BaseSettings):
thinking_memory_check_every: int = 50 # check memory status every Nth thought
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
# ── Dreaming Mode ─────────────────────────────────────────────────
# When enabled, the agent replays past sessions during idle time to
# simulate alternative actions and propose behavioural rules.
dreaming_enabled: bool = True
dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
dreaming_cycle_seconds: int = 600 # seconds between dream attempts
dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# These values are passed as env vars to the gitea-mcp server process.

View File

@@ -57,6 +57,7 @@ from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.dreaming import router as dreaming_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import matrix_router
from dashboard.routes.world import router as world_router
@@ -249,6 +250,36 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
async def _dreaming_scheduler() -> None:
"""Background task: run idle-time dreaming cycles.
When the system has been idle for ``dreaming_idle_threshold_minutes``,
the dreaming engine replays a past session and simulates alternatives.
"""
from timmy.dreaming import dreaming_engine
await asyncio.sleep(15) # Stagger after loop QA scheduler
while True:
try:
if settings.dreaming_enabled:
await asyncio.wait_for(
dreaming_engine.dream_once(),
timeout=settings.dreaming_timeout_seconds + 10,
)
except TimeoutError:
logger.warning(
"Dreaming cycle timed out after %ds",
settings.dreaming_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Dreaming scheduler error: %s", exc)
await asyncio.sleep(settings.dreaming_cycle_seconds)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -409,6 +440,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_dreaming_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
@@ -549,28 +581,12 @@ async def lifespan(app: FastAPI):
except Exception:
logger.debug("Failed to register error recorder")
# Mark session start for sovereignty duration tracking
try:
from timmy.sovereignty import mark_session_start
mark_session_start()
except Exception:
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
app = FastAPI(
title="Mission Control",
@@ -692,6 +708,7 @@ app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
app.include_router(dreaming_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,84 @@
"""Dreaming mode dashboard routes.
GET /dreaming/api/status — JSON status of the dreaming engine
GET /dreaming/api/recent — JSON list of recent dream records
POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
GET /dreaming/partial — HTMX partial: dreaming status panel
"""
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.templating import templates
from timmy.dreaming import dreaming_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dreaming", tags=["dreaming"])
@router.get("/api/status", response_class=JSONResponse)
async def dreaming_status():
"""Return current dreaming engine status as JSON."""
return dreaming_engine.get_status()
@router.get("/api/recent", response_class=JSONResponse)
async def dreaming_recent(limit: int = 10):
"""Return recent dream records as JSON."""
dreams = dreaming_engine.get_recent_dreams(limit=limit)
return [
{
"id": d.id,
"session_excerpt": d.session_excerpt[:200],
"decision_point": d.decision_point[:200],
"simulation": d.simulation,
"proposed_rule": d.proposed_rule,
"created_at": d.created_at,
}
for d in dreams
]
@router.post("/api/trigger", response_class=JSONResponse)
async def dreaming_trigger():
"""Manually trigger a dream cycle (bypasses idle check).
Useful for testing and manual inspection. Forces idle state temporarily.
"""
from datetime import UTC, datetime, timedelta
from config import settings
# Temporarily back-date last activity to appear idle
original_time = dreaming_engine._last_activity_time
dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
minutes=settings.dreaming_idle_threshold_minutes + 1
)
try:
dream = await dreaming_engine.dream_once()
finally:
dreaming_engine._last_activity_time = original_time
if dream:
return {
"status": "ok",
"dream_id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
}
return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
@router.get("/partial", response_class=HTMLResponse)
async def dreaming_partial(request: Request):
"""HTMX partial: dreaming status panel for the dashboard."""
status = dreaming_engine.get_status()
recent = dreaming_engine.get_recent_dreams(limit=5)
return templates.TemplateResponse(
request,
"partials/dreaming_status.html",
{"status": status, "recent_dreams": recent},
)

View File

@@ -0,0 +1,32 @@
{% if not status.enabled %}
<div class="dream-disabled text-muted small">Dreaming mode disabled</div>
{% elif status.dreaming %}
<div class="dream-active">
<span class="dream-pulse"></span>
<span class="dream-label">DREAMING</span>
<div class="dream-summary">{{ status.current_summary }}</div>
</div>
{% elif status.idle %}
<div class="dream-idle">
<span class="dream-dot dream-dot-idle"></span>
<span class="dream-label-idle">IDLE</span>
<span class="dream-idle-meta">{{ status.idle_minutes }}m — dream cycle pending</span>
</div>
{% else %}
<div class="dream-standby">
<span class="dream-dot dream-dot-standby"></span>
<span class="dream-label-standby">STANDBY</span>
<span class="dream-idle-meta">idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m</span>
</div>
{% endif %}
{% if recent_dreams %}
<div class="dream-history mt-2">
{% for d in recent_dreams %}
<div class="dream-record">
<div class="dream-rule">{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}</div>
<div class="dream-meta">{{ d.created_at[:16] | replace("T", " ") }}</div>
</div>
{% endfor %}
</div>
{% endif %}

435
src/timmy/dreaming.py Normal file
View File

@@ -0,0 +1,435 @@
"""Dreaming Mode — idle-time session replay and counterfactual simulation.
When the dashboard has been idle for a configurable period, this engine
selects a past chat session, identifies key agent response points, and
asks the LLM to simulate alternative approaches. Insights are stored as
proposed rules that can feed the auto-crystallizer or memory system.
Usage::
from timmy.dreaming import dreaming_engine
# Run one dream cycle (called by the background scheduler)
await dreaming_engine.dream_once()
# Query recent dreams
dreams = dreaming_engine.get_recent_dreams(limit=10)
# Get current status dict for API/dashboard
status = dreaming_engine.get_status()
"""
import json
import logging
import re
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
_DEFAULT_DB = Path("data/dreams.db")
# Strip <think> tags from reasoning model output
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Minimum messages in a session to be worth replaying
_MIN_SESSION_MESSAGES = 3
# Gap in seconds between messages that signals a new session
_SESSION_GAP_SECONDS = 1800 # 30 minutes
@dataclass
class DreamRecord:
"""A single completed dream cycle."""
id: str
session_excerpt: str # Short excerpt from the replayed session
decision_point: str # The agent message that was re-simulated
simulation: str # The alternative response generated
proposed_rule: str # Rule extracted from the simulation
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS dreams (
id TEXT PRIMARY KEY,
session_excerpt TEXT NOT NULL,
decision_point TEXT NOT NULL,
simulation TEXT NOT NULL,
proposed_rule TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
conn.commit()
yield conn
def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
return DreamRecord(
id=row["id"],
session_excerpt=row["session_excerpt"],
decision_point=row["decision_point"],
simulation=row["simulation"],
proposed_rule=row["proposed_rule"],
created_at=row["created_at"],
)
class DreamingEngine:
"""Idle-time dreaming engine — replays sessions and simulates alternatives."""
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_activity_time: datetime = datetime.now(UTC)
self._is_dreaming: bool = False
self._current_dream_summary: str = ""
self._dreaming_agent = None # Lazy-initialised
# ── Public API ────────────────────────────────────────────────────────
def record_activity(self) -> None:
"""Reset the idle timer — call this on every user/agent interaction."""
self._last_activity_time = datetime.now(UTC)
def is_idle(self) -> bool:
"""Return True if the system has been idle long enough to start dreaming."""
threshold = settings.dreaming_idle_threshold_minutes
if threshold <= 0:
return False
return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
def get_status(self) -> dict[str, Any]:
"""Return a status dict suitable for API/dashboard consumption."""
return {
"enabled": settings.dreaming_enabled,
"dreaming": self._is_dreaming,
"idle": self.is_idle(),
"current_summary": self._current_dream_summary,
"idle_minutes": int(
(datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
),
"idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
"dream_count": self.count_dreams(),
}
async def dream_once(self) -> DreamRecord | None:
"""Execute one dream cycle.
Returns the stored DreamRecord, or None if the cycle was skipped
(not idle, dreaming disabled, no suitable session, or LLM error).
"""
if not settings.dreaming_enabled:
return None
if not self.is_idle():
logger.debug(
"Dreaming skipped — system active (idle for %d min, threshold %d min)",
int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
settings.dreaming_idle_threshold_minutes,
)
return None
if self._is_dreaming:
logger.debug("Dreaming skipped — cycle already in progress")
return None
self._is_dreaming = True
self._current_dream_summary = "Selecting a past session…"
await self._broadcast_status()
try:
return await self._run_dream_cycle()
except Exception as exc:
logger.warning("Dream cycle failed: %s", exc)
return None
finally:
self._is_dreaming = False
self._current_dream_summary = ""
await self._broadcast_status()
def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
"""Retrieve the most recent dream records."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_dream(r) for r in rows]
def count_dreams(self) -> int:
"""Return total number of stored dream records."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
return row["c"] if row else 0
# ── Private helpers ───────────────────────────────────────────────────
async def _run_dream_cycle(self) -> DreamRecord | None:
"""Core dream logic: select → simulate → store."""
# 1. Select a past session from the chat log
session = await self._select_session()
if not session:
logger.debug("No suitable chat session found for dreaming")
self._current_dream_summary = "No past sessions to replay"
return None
decision_point, session_excerpt = session
self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}"
await self._broadcast_status()
# 2. Simulate an alternative response
simulation = await self._simulate_alternative(decision_point, session_excerpt)
if not simulation:
logger.debug("Dream simulation produced no output")
return None
# 3. Extract a proposed rule
proposed_rule = await self._extract_rule(decision_point, simulation)
# 4. Store and broadcast
dream = self._store_dream(
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
)
self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
logger.info(
"Dream [%s]: replayed session, proposed rule: %s",
dream.id[:8],
proposed_rule[:80] if proposed_rule else "(none)",
)
await self._broadcast_status()
await self._broadcast_dream(dream)
return dream
async def _select_session(self) -> tuple[str, str] | None:
"""Select a past chat session and return (decision_point, session_excerpt).
Uses the SQLite chat store. Groups messages into sessions by time
gap. Picks a random session with enough messages, then selects one
agent response as the decision point.
"""
try:
from infrastructure.chat_store import DB_PATH
if not DB_PATH.exists():
return None
import asyncio
rows = await asyncio.to_thread(self._load_chat_rows)
if not rows:
return None
sessions = self._group_into_sessions(rows)
if not sessions:
return None
# Filter sessions with enough messages
valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
if not valid:
return None
import random
session = random.choice(valid) # noqa: S311 (not cryptographic)
# Build a short text excerpt (last N messages)
excerpt_msgs = session[-6:]
excerpt = "\n".join(
f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
)
# Find agent responses as candidate decision points
agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
if not agent_msgs:
return None
decision = random.choice(agent_msgs) # noqa: S311
return decision["content"], excerpt
except Exception as exc:
logger.warning("Session selection failed: %s", exc)
return None
def _load_chat_rows(self) -> list[dict]:
"""Synchronously load chat messages from SQLite."""
from infrastructure.chat_store import DB_PATH
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT role, content, timestamp FROM chat_messages "
"ORDER BY timestamp ASC"
).fetchall()
return [dict(r) for r in rows]
def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
"""Group chat rows into sessions based on time gaps."""
if not rows:
return []
sessions: list[list[dict]] = []
current: list[dict] = [rows[0]]
for prev, curr in zip(rows, rows[1:]):
try:
t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
gap = (t_curr - t_prev).total_seconds()
except Exception:
gap = 0
if gap > _SESSION_GAP_SECONDS:
sessions.append(current)
current = [curr]
else:
current.append(curr)
sessions.append(current)
return sessions
async def _simulate_alternative(
self, decision_point: str, session_excerpt: str
) -> str:
"""Ask the LLM to simulate an alternative response."""
prompt = (
"You are Timmy, a sovereign AI agent in a dreaming state.\n"
"You are replaying a past conversation and exploring what you could "
"have done differently at a key decision point.\n\n"
"PAST SESSION EXCERPT:\n"
f"{session_excerpt}\n\n"
"KEY DECISION POINT (your past response):\n"
f"{decision_point[:500]}\n\n"
"TASK: In 2-3 sentences, describe ONE concrete alternative approach "
"you could have taken at this decision point that would have been "
"more helpful, more accurate, or more efficient.\n"
"Be specific — reference the actual content of the conversation.\n"
"Do NOT include meta-commentary about dreaming or this exercise.\n\n"
"Alternative approach:"
)
raw = await self._call_agent(prompt)
return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
async def _extract_rule(self, decision_point: str, simulation: str) -> str:
"""Extract a proposed behaviour rule from the simulation."""
prompt = (
"Given this pair of agent responses:\n\n"
f"ORIGINAL: {decision_point[:300]}\n\n"
f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
"Extract ONE concise rule (max 20 words) that captures what to do "
"differently next time. Format: 'When X, do Y instead of Z.'\n"
"Rule:"
)
raw = await self._call_agent(prompt)
rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
# Keep only the first sentence/line
rule = rule.split("\n")[0].strip().rstrip(".")
return rule[:200] # Safety cap
async def _call_agent(self, prompt: str) -> str:
"""Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
import asyncio
if self._dreaming_agent is None:
from timmy.agent import create_timmy
self._dreaming_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(settings.dreaming_timeout_seconds):
run = await self._dreaming_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
return ""
except Exception as exc:
logger.warning("Dreaming LLM call failed: %s", exc)
return ""
raw = run.content if hasattr(run, "content") else str(run)
return raw or ""
def _store_dream(
self,
*,
session_excerpt: str,
decision_point: str,
simulation: str,
proposed_rule: str,
) -> DreamRecord:
dream = DreamRecord(
id=str(uuid.uuid4()),
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
created_at=datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO dreams
(id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
dream.id,
dream.session_excerpt,
dream.decision_point,
dream.simulation,
dream.proposed_rule,
dream.created_at,
),
)
conn.commit()
return dream
async def _broadcast_status(self) -> None:
"""Push current dreaming status via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("dreaming_state", self.get_status())
except Exception as exc:
logger.debug("Dreaming status broadcast failed: %s", exc)
async def _broadcast_dream(self, dream: DreamRecord) -> None:
"""Push a completed dream record via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"dreaming_complete",
{
"id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
"created_at": dream.created_at,
},
)
except Exception as exc:
logger.debug("Dreaming complete broadcast failed: %s", exc)
# Module-level singleton
dreaming_engine = DreamingEngine()

View File

@@ -4,23 +4,4 @@ Tracks how much of each AI layer (perception, decision, narration)
runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard.
Refs: #954, #953
Session reporting: auto-generates markdown scorecards at session end
and commits them to the Gitea repo for institutional memory.
Refs: #957 (Session Sovereignty Report Generator)
"""
from timmy.sovereignty.session_report import (
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
__all__ = [
"generate_report",
"commit_report",
"generate_and_commit_report",
"mark_session_start",
]

View File

@@ -1,442 +0,0 @@
"""Session Sovereignty Report Generator.
Auto-generates a sovereignty scorecard at the end of each play session
and commits it as a markdown file to the Gitea repo under
``reports/sovereignty/``.
Report contents (per issue #957):
- Session duration + game played
- Total model calls by type (VLM, LLM, TTS, API)
- Total cache/rule hits by type
- New skills crystallized (placeholder — pending skill-tracking impl)
- Sovereignty delta (change from session start → end)
- Cost breakdown (actual API spend)
- Per-layer sovereignty %: perception, decision, narration
- Trend comparison vs previous session
Refs: #957 (Sovereignty P0) · #953 (The Sovereignty Loop)
"""
import base64
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import httpx
from config import settings
# Optional module-level imports — degrade gracefully if unavailable at import time
try:
from timmy.session_logger import get_session_logger
except Exception: # ImportError or circular import during early startup
get_session_logger = None # type: ignore[assignment]
try:
from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store
except Exception:
GRADUATION_TARGETS: dict = {} # type: ignore[assignment]
get_sovereignty_store = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# Module-level session start time; set by mark_session_start()
_SESSION_START: datetime | None = None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def mark_session_start() -> None:
"""Record the session start wall-clock time.
Call once during application startup so ``generate_report()`` can
compute accurate session durations.
"""
global _SESSION_START
_SESSION_START = datetime.now(UTC)
logger.debug("Sovereignty: session start recorded at %s", _SESSION_START.isoformat())
def generate_report(session_id: str = "dashboard") -> str:
"""Render a sovereignty scorecard as a markdown string.
Pulls from:
- ``timmy.session_logger`` — message/tool-call/error counts
- ``infrastructure.sovereignty_metrics`` — cache hit rate, API cost,
graduation phase, and trend data
Args:
session_id: The session identifier (default: "dashboard").
Returns:
Markdown-formatted sovereignty report string.
"""
now = datetime.now(UTC)
session_start = _SESSION_START or now
duration_secs = (now - session_start).total_seconds()
session_data = _gather_session_data()
sov_data = _gather_sovereignty_data()
return _render_markdown(now, session_id, duration_secs, session_data, sov_data)
def commit_report(report_md: str, session_id: str = "dashboard") -> bool:
"""Commit a sovereignty report to the Gitea repo.
Creates or updates ``reports/sovereignty/{date}_{session_id}.md``
via the Gitea Contents API. Degrades gracefully: logs a warning
and returns ``False`` if Gitea is unreachable or misconfigured.
Args:
report_md: Markdown content to commit.
session_id: Session identifier used in the filename.
Returns:
``True`` on success, ``False`` on failure.
"""
if not settings.gitea_enabled:
logger.info("Sovereignty: Gitea disabled — skipping report commit")
return False
if not settings.gitea_token:
logger.warning("Sovereignty: no Gitea token — skipping report commit")
return False
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
file_path = f"reports/sovereignty/{date_str}_{session_id}.md"
url = f"{settings.gitea_url}/api/v1/repos/{settings.gitea_repo}/contents/{file_path}"
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
encoded_content = base64.b64encode(report_md.encode()).decode()
commit_message = (
f"report: sovereignty session {session_id} ({date_str})\n\n"
f"Auto-generated by Timmy. Refs #957"
)
payload: dict[str, Any] = {
"message": commit_message,
"content": encoded_content,
}
try:
with httpx.Client(timeout=10.0) as client:
# Fetch existing file SHA so we can update rather than create
check = client.get(url, headers=headers)
if check.status_code == 200:
existing = check.json()
payload["sha"] = existing.get("sha", "")
resp = client.put(url, headers=headers, json=payload)
resp.raise_for_status()
logger.info("Sovereignty: report committed to %s", file_path)
return True
except httpx.HTTPStatusError as exc:
logger.warning(
"Sovereignty: commit failed (HTTP %s): %s",
exc.response.status_code,
exc,
)
return False
except Exception as exc:
logger.warning("Sovereignty: commit failed: %s", exc)
return False
async def generate_and_commit_report(session_id: str = "dashboard") -> bool:
"""Generate and commit a sovereignty report for the current session.
Primary entry point — call at session end / application shutdown.
Wraps the synchronous ``commit_report`` call in ``asyncio.to_thread``
so it does not block the event loop.
Args:
session_id: The session identifier.
Returns:
``True`` if the report was generated and committed successfully.
"""
import asyncio
try:
report_md = generate_report(session_id)
logger.info("Sovereignty: report generated (%d chars)", len(report_md))
committed = await asyncio.to_thread(commit_report, report_md, session_id)
return committed
except Exception as exc:
logger.warning("Sovereignty: report generation failed: %s", exc)
return False
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string."""
total = int(seconds)
hours, remainder = divmod(total, 3600)
minutes, secs = divmod(remainder, 60)
if hours:
return f"{hours}h {minutes}m {secs}s"
if minutes:
return f"{minutes}m {secs}s"
return f"{secs}s"
def _gather_session_data() -> dict[str, Any]:
"""Pull session statistics from the session logger.
Returns a dict with:
- ``user_messages``, ``timmy_messages``, ``tool_calls``, ``errors``
- ``tool_call_breakdown``: dict[tool_name, count]
"""
default: dict[str, Any] = {
"user_messages": 0,
"timmy_messages": 0,
"tool_calls": 0,
"errors": 0,
"tool_call_breakdown": {},
}
try:
if get_session_logger is None:
return default
sl = get_session_logger()
sl.flush()
# Read today's session file directly for accurate counts
if not sl.session_file.exists():
return default
entries: list[dict] = []
with open(sl.session_file) as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
tool_breakdown: dict[str, int] = {}
user_msgs = timmy_msgs = tool_calls = errors = 0
for entry in entries:
etype = entry.get("type")
if etype == "message":
if entry.get("role") == "user":
user_msgs += 1
elif entry.get("role") == "timmy":
timmy_msgs += 1
elif etype == "tool_call":
tool_calls += 1
tool_name = entry.get("tool", "unknown")
tool_breakdown[tool_name] = tool_breakdown.get(tool_name, 0) + 1
elif etype == "error":
errors += 1
return {
"user_messages": user_msgs,
"timmy_messages": timmy_msgs,
"tool_calls": tool_calls,
"errors": errors,
"tool_call_breakdown": tool_breakdown,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather session data: %s", exc)
return default
def _gather_sovereignty_data() -> dict[str, Any]:
"""Pull sovereignty metrics from the SQLite store.
Returns a dict with:
- ``metrics``: summary from ``SovereigntyMetricsStore.get_summary()``
- ``deltas``: per-metric start/end values within recent history window
- ``previous_session``: most recent prior value for each metric
"""
try:
if get_sovereignty_store is None:
return {"metrics": {}, "deltas": {}, "previous_session": {}}
store = get_sovereignty_store()
summary = store.get_summary()
deltas: dict[str, dict[str, Any]] = {}
previous_session: dict[str, float | None] = {}
for metric_type in GRADUATION_TARGETS:
history = store.get_latest(metric_type, limit=10)
if len(history) >= 2:
deltas[metric_type] = {
"start": history[-1]["value"],
"end": history[0]["value"],
}
previous_session[metric_type] = history[1]["value"]
elif len(history) == 1:
deltas[metric_type] = {"start": history[0]["value"], "end": history[0]["value"]}
previous_session[metric_type] = None
else:
deltas[metric_type] = {"start": None, "end": None}
previous_session[metric_type] = None
return {
"metrics": summary,
"deltas": deltas,
"previous_session": previous_session,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather sovereignty data: %s", exc)
return {"metrics": {}, "deltas": {}, "previous_session": {}}
def _render_markdown(
now: datetime,
session_id: str,
duration_secs: float,
session_data: dict[str, Any],
sov_data: dict[str, Any],
) -> str:
"""Assemble the full sovereignty report in markdown."""
lines: list[str] = []
# Header
lines += [
"# Sovereignty Session Report",
"",
f"**Session ID:** `{session_id}` ",
f"**Date:** {now.strftime('%Y-%m-%d')} ",
f"**Duration:** {_format_duration(duration_secs)} ",
f"**Generated:** {now.isoformat()}",
"",
"---",
"",
]
# Session activity
lines += [
"## Session Activity",
"",
"| Metric | Count |",
"|--------|-------|",
f"| User messages | {session_data['user_messages']} |",
f"| Timmy responses | {session_data['timmy_messages']} |",
f"| Tool calls | {session_data['tool_calls']} |",
f"| Errors | {session_data['errors']} |",
"",
]
tool_breakdown = session_data.get("tool_call_breakdown", {})
if tool_breakdown:
lines += ["### Model Calls by Tool", ""]
for tool_name, count in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
lines.append(f"- `{tool_name}`: {count}")
lines.append("")
# Sovereignty scorecard
lines += [
"## Sovereignty Scorecard",
"",
"| Metric | Current | Target (graduation) | Phase |",
"|--------|---------|---------------------|-------|",
]
for metric_type, data in sov_data["metrics"].items():
current = data.get("current")
current_str = f"{current:.4f}" if current is not None else "N/A"
grad_target = GRADUATION_TARGETS.get(metric_type, {}).get("graduation")
grad_str = f"{grad_target:.4f}" if isinstance(grad_target, (int, float)) else "N/A"
phase = data.get("phase", "unknown")
lines.append(f"| {metric_type} | {current_str} | {grad_str} | {phase} |")
lines += ["", "### Sovereignty Delta (This Session)", ""]
for metric_type, delta_info in sov_data.get("deltas", {}).items():
start_val = delta_info.get("start")
end_val = delta_info.get("end")
if start_val is not None and end_val is not None:
diff = end_val - start_val
sign = "+" if diff >= 0 else ""
lines.append(
f"- **{metric_type}**: {start_val:.4f}{end_val:.4f} ({sign}{diff:.4f})"
)
else:
lines.append(f"- **{metric_type}**: N/A (no data recorded)")
# Cost breakdown
lines += ["", "## Cost Breakdown", ""]
api_cost_data = sov_data["metrics"].get("api_cost", {})
current_cost = api_cost_data.get("current")
if current_cost is not None:
lines.append(f"- **Total API spend (latest recorded):** ${current_cost:.4f}")
else:
lines.append("- **Total API spend:** N/A (no data recorded)")
lines.append("")
# Per-layer sovereignty
lines += [
"## Per-Layer Sovereignty",
"",
"| Layer | Sovereignty % |",
"|-------|--------------|",
"| Perception (VLM) | N/A |",
"| Decision (LLM) | N/A |",
"| Narration (TTS) | N/A |",
"",
"> Per-layer tracking requires instrumented inference calls. See #957.",
"",
]
# Skills crystallized
lines += [
"## Skills Crystallized",
"",
"_Skill crystallization tracking not yet implemented. See #957._",
"",
]
# Trend vs previous session
lines += ["## Trend vs Previous Session", ""]
prev_data = sov_data.get("previous_session", {})
has_prev = any(v is not None for v in prev_data.values())
if has_prev:
lines += [
"| Metric | Previous | Current | Change |",
"|--------|----------|---------|--------|",
]
for metric_type, curr_info in sov_data["metrics"].items():
curr_val = curr_info.get("current")
prev_val = prev_data.get(metric_type)
curr_str = f"{curr_val:.4f}" if curr_val is not None else "N/A"
prev_str = f"{prev_val:.4f}" if prev_val is not None else "N/A"
if curr_val is not None and prev_val is not None:
diff = curr_val - prev_val
sign = "+" if diff >= 0 else ""
change_str = f"{sign}{diff:.4f}"
else:
change_str = "N/A"
lines.append(f"| {metric_type} | {prev_str} | {curr_str} | {change_str} |")
lines.append("")
else:
lines += ["_No previous session data available for comparison._", ""]
# Footer
lines += [
"---",
"_Auto-generated by Timmy · Session Sovereignty Report · Refs: #957_",
]
return "\n".join(lines)

View File

@@ -2549,6 +2549,7 @@
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
/* ── Voice settings ───────────────────────────────────────── */
.voice-settings-page { max-width: 600px; margin: 0 auto; }
@@ -2714,3 +2715,45 @@
padding: 0.3rem 0.6rem;
margin-bottom: 0.5rem;
}
/* ═══════════════════════════════════════════════════════════════
Dreaming Mode
═══════════════════════════════════════════════════════════════ */
.dream-active {
display: flex; align-items: center; gap: 8px;
padding: 6px 0;
}
.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
.dream-pulse {
display: inline-block; width: 8px; height: 8px; border-radius: 50%;
background: var(--purple);
animation: dream-pulse 1.8s ease-in-out infinite;
}
@keyframes dream-pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.4; transform: scale(0.7); }
}
.dream-dot {
display: inline-block; width: 7px; height: 7px; border-radius: 50%;
}
.dream-dot-idle { background: var(--amber); }
.dream-dot-standby { background: var(--text-dim); }
.dream-idle, .dream-standby {
display: flex; align-items: center; gap: 6px; padding: 4px 0;
}
.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
.dream-record:last-child { border-bottom: none; }
.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }

View File

@@ -1,444 +0,0 @@
"""Tests for timmy.sovereignty.session_report.
Refs: #957 (Session Sovereignty Report Generator)
"""
import base64
import json
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
from timmy.sovereignty.session_report import (
_format_duration,
_gather_session_data,
_gather_sovereignty_data,
_render_markdown,
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
# ---------------------------------------------------------------------------
# _format_duration
# ---------------------------------------------------------------------------
class TestFormatDuration:
def test_seconds_only(self):
assert _format_duration(45) == "45s"
def test_minutes_and_seconds(self):
assert _format_duration(125) == "2m 5s"
def test_hours_minutes_seconds(self):
assert _format_duration(3661) == "1h 1m 1s"
def test_zero(self):
assert _format_duration(0) == "0s"
# ---------------------------------------------------------------------------
# mark_session_start + generate_report (smoke)
# ---------------------------------------------------------------------------
class TestMarkSessionStart:
def test_sets_session_start(self):
import timmy.sovereignty.session_report as sr
sr._SESSION_START = None
mark_session_start()
assert sr._SESSION_START is not None
assert sr._SESSION_START.tzinfo == UTC
def test_idempotent_overwrite(self):
import timmy.sovereignty.session_report as sr
mark_session_start()
first = sr._SESSION_START
time.sleep(0.01)
mark_session_start()
second = sr._SESSION_START
assert second >= first
# ---------------------------------------------------------------------------
# _gather_session_data
# ---------------------------------------------------------------------------
class TestGatherSessionData:
def test_returns_defaults_when_no_file(self, tmp_path):
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = tmp_path / "nonexistent.jsonl"
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 0
assert data["timmy_messages"] == 0
assert data["tool_calls"] == 0
assert data["errors"] == 0
assert data["tool_call_breakdown"] == {}
def test_counts_entries_correctly(self, tmp_path):
session_file = tmp_path / "session_2026-03-23.jsonl"
entries = [
{"type": "message", "role": "user", "content": "hello"},
{"type": "message", "role": "timmy", "content": "hi"},
{"type": "message", "role": "user", "content": "test"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "found"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "nope"},
{"type": "tool_call", "tool": "shell", "args": {}, "result": "ok"},
{"type": "error", "error": "boom"},
]
with open(session_file, "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = session_file
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 2
assert data["timmy_messages"] == 1
assert data["tool_calls"] == 3
assert data["errors"] == 1
assert data["tool_call_breakdown"]["memory_search"] == 2
assert data["tool_call_breakdown"]["shell"] == 1
def test_graceful_on_import_error(self):
with patch(
"timmy.sovereignty.session_report.get_session_logger",
side_effect=ImportError("no session_logger"),
):
data = _gather_session_data()
assert data["tool_calls"] == 0
# ---------------------------------------------------------------------------
# _gather_sovereignty_data
# ---------------------------------------------------------------------------
class TestGatherSovereigntyData:
def test_returns_empty_on_import_error(self):
with patch.dict("sys.modules", {"infrastructure.sovereignty_metrics": None}):
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
side_effect=ImportError("no store"),
):
data = _gather_sovereignty_data()
assert data["metrics"] == {}
assert data["deltas"] == {}
assert data["previous_session"] == {}
def test_populates_deltas_from_history(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {
"cache_hit_rate": {"current": 0.5, "phase": "week1"},
}
# get_latest returns newest-first
mock_store.get_latest.return_value = [
{"value": 0.5},
{"value": 0.3},
{"value": 0.1},
]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"cache_hit_rate": {"graduation": 0.9}},
):
data = _gather_sovereignty_data()
delta = data["deltas"].get("cache_hit_rate")
assert delta is not None
assert delta["start"] == 0.1 # oldest in window
assert delta["end"] == 0.5 # most recent
assert data["previous_session"]["cache_hit_rate"] == 0.3
def test_single_data_point_no_delta(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {}
mock_store.get_latest.return_value = [{"value": 0.4}]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"api_cost": {"graduation": 0.01}},
):
data = _gather_sovereignty_data()
delta = data["deltas"]["api_cost"]
assert delta["start"] == 0.4
assert delta["end"] == 0.4
assert data["previous_session"]["api_cost"] is None
# ---------------------------------------------------------------------------
# generate_report (integration — smoke test)
# ---------------------------------------------------------------------------
class TestGenerateReport:
def _minimal_session_data(self):
return {
"user_messages": 3,
"timmy_messages": 3,
"tool_calls": 2,
"errors": 0,
"tool_call_breakdown": {"memory_search": 2},
}
def _minimal_sov_data(self):
return {
"metrics": {
"cache_hit_rate": {"current": 0.45, "phase": "week1"},
"api_cost": {"current": 0.12, "phase": "pre-start"},
},
"deltas": {
"cache_hit_rate": {"start": 0.40, "end": 0.45},
"api_cost": {"start": 0.10, "end": 0.12},
},
"previous_session": {
"cache_hit_rate": 0.40,
"api_cost": 0.10,
},
}
def test_smoke_produces_markdown(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report("test-session")
assert "# Sovereignty Session Report" in report
assert "test-session" in report
assert "## Session Activity" in report
assert "## Sovereignty Scorecard" in report
assert "## Cost Breakdown" in report
assert "## Trend vs Previous Session" in report
def test_report_contains_session_stats(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report()
assert "| User messages | 3 |" in report
assert "memory_search" in report
def test_report_no_previous_session(self):
sov = self._minimal_sov_data()
sov["previous_session"] = {"cache_hit_rate": None, "api_cost": None}
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=sov,
),
):
report = generate_report()
assert "No previous session data" in report
# ---------------------------------------------------------------------------
# commit_report
# ---------------------------------------------------------------------------
class TestCommitReport:
def test_returns_false_when_gitea_disabled(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = False
result = commit_report("# test", "dashboard")
assert result is False
def test_returns_false_when_no_token(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
result = commit_report("# test", "dashboard")
assert result is False
def test_creates_file_via_put(self):
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.raise_for_status.return_value = None
mock_check = MagicMock()
mock_check.status_code = 404 # file does not exist yet
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# report content", "dashboard")
assert result is True
mock_client.put.assert_called_once()
call_kwargs = mock_client.put.call_args
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
decoded = base64.b64decode(payload["content"]).decode()
assert "# report content" in decoded
def test_updates_existing_file_with_sha(self):
mock_check = MagicMock()
mock_check.status_code = 200
mock_check.json.return_value = {"sha": "abc123"}
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# updated", "dashboard")
assert result is True
payload = mock_client.put.call_args.kwargs.get("json", {})
assert payload.get("sha") == "abc123"
def test_returns_false_on_http_error(self):
import httpx
mock_check = MagicMock()
mock_check.status_code = 404
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.side_effect = httpx.HTTPStatusError(
"403", request=MagicMock(), response=MagicMock(status_code=403)
)
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# test", "dashboard")
assert result is False
# ---------------------------------------------------------------------------
# generate_and_commit_report (async)
# ---------------------------------------------------------------------------
class TestGenerateAndCommitReport:
async def test_returns_true_on_success(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=True,
),
):
result = await generate_and_commit_report("test")
assert result is True
async def test_returns_false_when_commit_fails(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=False,
),
):
result = await generate_and_commit_report()
assert result is False
async def test_graceful_on_exception(self):
with patch(
"timmy.sovereignty.session_report.generate_report",
side_effect=RuntimeError("explode"),
):
result = await generate_and_commit_report()
assert result is False

217
tests/unit/test_dreaming.py Normal file
View File

@@ -0,0 +1,217 @@
"""Unit tests for the Dreaming mode engine."""
import sqlite3
from contextlib import closing
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.dreaming import DreamingEngine, DreamRecord, _SESSION_GAP_SECONDS
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def tmp_dreams_db(tmp_path):
"""Return a temporary path for the dreams database."""
return tmp_path / "dreams.db"
@pytest.fixture()
def engine(tmp_dreams_db):
"""DreamingEngine backed by a temp database."""
return DreamingEngine(db_path=tmp_dreams_db)
@pytest.fixture()
def chat_db(tmp_path):
"""Create a minimal chat database with some messages."""
db_path = tmp_path / "chat.db"
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute("""
CREATE TABLE chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'browser'
)
""")
now = datetime.now(UTC)
messages = [
("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
]
conn.executemany(
"INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
messages,
)
conn.commit()
return db_path
# ── Idle detection ─────────────────────────────────────────────────────────────
class TestIdleDetection:
def test_not_idle_immediately(self, engine):
assert engine.is_idle() is False
def test_idle_after_threshold(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is True
def test_not_idle_when_threshold_zero(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 0
assert engine.is_idle() is False
def test_record_activity_resets_timer(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
engine.record_activity()
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is False
# ── Status dict ───────────────────────────────────────────────────────────────
class TestGetStatus:
def test_status_shape(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
status = engine.get_status()
assert "enabled" in status
assert "dreaming" in status
assert "idle" in status
assert "dream_count" in status
assert "idle_minutes" in status
def test_dream_count_starts_at_zero(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.get_status()["dream_count"] == 0
# ── Session grouping ──────────────────────────────────────────────────────────
class TestGroupIntoSessions:
def test_single_session(self, engine):
now = datetime.now(UTC)
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 1
assert len(sessions[0]) == 2
def test_splits_on_large_gap(self, engine):
now = datetime.now(UTC)
gap = _SESSION_GAP_SECONDS + 100
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 2
def test_empty_input(self, engine):
assert engine._group_into_sessions([]) == []
# ── Dream storage ─────────────────────────────────────────────────────────────
class TestDreamStorage:
def test_store_and_retrieve(self, engine):
dream = engine._store_dream(
session_excerpt="User asked about Python.",
decision_point="Python uses try/except blocks.",
simulation="I could have given a code example.",
proposed_rule="When explaining errors, include a code snippet.",
)
assert dream.id
assert dream.proposed_rule == "When explaining errors, include a code snippet."
retrieved = engine.get_recent_dreams(limit=1)
assert len(retrieved) == 1
assert retrieved[0].id == dream.id
def test_count_increments(self, engine):
assert engine.count_dreams() == 0
engine._store_dream(
session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
)
assert engine.count_dreams() == 1
# ── dream_once integration ─────────────────────────────────────────────────────
class TestDreamOnce:
@pytest.mark.asyncio
async def test_skips_when_disabled(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = False
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_not_idle(self, engine):
engine._last_activity_time = datetime.now(UTC)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 60
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_already_dreaming(self, engine):
engine._is_dreaming = True
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 0
result = await engine.dream_once()
# Reset for cleanliness
engine._is_dreaming = False
assert result is None
@pytest.mark.asyncio
async def test_dream_produces_record_when_idle(self, engine, chat_db):
"""Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
with (
patch("timmy.dreaming.settings") as mock_settings,
patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
patch("infrastructure.chat_store.DB_PATH", chat_db),
):
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
mock_settings.dreaming_timeout_seconds = 30
mock_agent.side_effect = [
"I could have provided a concrete try/except example.", # simulation
"When explaining errors, always include a runnable code snippet.", # rule
]
result = await engine.dream_once()
assert result is not None
assert isinstance(result, DreamRecord)
assert result.simulation
assert result.proposed_rule
assert engine.count_dreams() == 1