Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
0b284972cb fix: correct complexity routing to not fall back to default model
Some checks failed
Tests / lint (pull_request) Failing after 17s
Tests / test (pull_request) Has been skipped
`_get_model_for_complexity` was calling `get_model_with_capability`,
which silently falls back to the provider default when no model has the
requested capability tag.  This caused the method to return a generic
model instead of None when neither the fallback chain nor any explicit
capability tag matched, misleading callers into skipping the provider
default logic.

Replace the call with an explicit next() comprehension that returns None
when no model explicitly carries the 'routine' or 'complex' capability.

Refs #1065

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:30:23 -04:00
Alexander Whitestone
6c5f55230b WIP: Claude Code progress on #1065
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:41:42 -04:00
13 changed files with 600 additions and 854 deletions

View File

@@ -25,6 +25,19 @@ providers:
tier: local
url: "http://localhost:11434"
models:
# ── Dual-model routing: Qwen3-8B (fast) + Qwen3-14B (quality) ──────────
# Both models fit simultaneously: ~6.6 GB + ~10.5 GB = ~17 GB combined.
# Requires OLLAMA_MAX_LOADED_MODELS=2 (set in .env) to stay hot.
# Ref: issue #1065 — Qwen3-8B/14B dual-model routing strategy
- name: qwen3:8b
context_window: 32768
capabilities: [text, tools, json, streaming, routine]
description: "Qwen3-8B Q6_K — fast router for routine tasks (~6.6 GB, 45-55 tok/s)"
- name: qwen3:14b
context_window: 40960
capabilities: [text, tools, json, streaming, complex, reasoning]
description: "Qwen3-14B Q5_K_M — complex reasoning and planning (~10.5 GB, 20-28 tok/s)"
# Text + Tools models
- name: qwen3:30b
default: true
@@ -187,6 +200,20 @@ fallback_chains:
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
# ── Complexity-based routing chains (issue #1065) ───────────────────────
# Routine tasks: prefer Qwen3-8B for low latency (~45-55 tok/s)
routine:
- qwen3:8b # Primary fast model
- llama3.1:8b-instruct # Fallback fast model
- llama3.2:3b # Smallest available
# Complex tasks: prefer Qwen3-14B for quality (~20-28 tok/s)
complex:
- qwen3:14b # Primary quality model
- hermes4-14b # Native tool calling, hybrid reasoning
- qwen3:30b # Highest local quality
- qwen2.5:14b # Additional fallback
# ── Custom Models ───────────────────────────────────────────────────────────
# Register custom model weights for per-agent assignment.
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.

View File

@@ -41,6 +41,13 @@ class Settings(BaseSettings):
# 4096 keeps memory at ~19GB. Set to 0 to use model defaults.
ollama_num_ctx: int = 4096
# Maximum models loaded simultaneously in Ollama — override with OLLAMA_MAX_LOADED_MODELS
# Set to 2 so Qwen3-8B and Qwen3-14B can stay hot concurrently (~17 GB combined).
# Requires Ollama ≥ 0.1.33. Export this to the Ollama process environment:
# OLLAMA_MAX_LOADED_MODELS=2 ollama serve
# or add it to your systemd/launchd unit before starting the harness.
ollama_max_loaded_models: int = 2
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:30b,llama3.1"
# Or edit config/providers.yaml → fallback_chains for the canonical source.
@@ -289,14 +296,6 @@ class Settings(BaseSettings):
thinking_memory_check_every: int = 50 # check memory status every Nth thought
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
# ── Dreaming Mode ─────────────────────────────────────────────────
# When enabled, the agent replays past sessions during idle time to
# simulate alternative actions and propose behavioural rules.
dreaming_enabled: bool = True
dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
dreaming_cycle_seconds: int = 600 # seconds between dream attempts
dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# These values are passed as env vars to the gitea-mcp server process.

View File

@@ -35,7 +35,6 @@ from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
from dashboard.routes.daily_run import router as daily_run_router
from dashboard.routes.db_explorer import router as db_explorer_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.dreaming import router as dreaming_router
from dashboard.routes.experiments import router as experiments_router
from dashboard.routes.grok import router as grok_router
from dashboard.routes.health import router as health_router
@@ -220,36 +219,6 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
async def _dreaming_scheduler() -> None:
"""Background task: run idle-time dreaming cycles.
When the system has been idle for ``dreaming_idle_threshold_minutes``,
the dreaming engine replays a past session and simulates alternatives.
"""
from timmy.dreaming import dreaming_engine
await asyncio.sleep(15) # Stagger after loop QA scheduler
while True:
try:
if settings.dreaming_enabled:
await asyncio.wait_for(
dreaming_engine.dream_once(),
timeout=settings.dreaming_timeout_seconds + 10,
)
except TimeoutError:
logger.warning(
"Dreaming cycle timed out after %ds",
settings.dreaming_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Dreaming scheduler error: %s", exc)
await asyncio.sleep(settings.dreaming_cycle_seconds)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -410,7 +379,6 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_dreaming_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
@@ -673,7 +641,6 @@ app.include_router(daily_run_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(dreaming_router)
@app.websocket("/ws")

View File

@@ -1,85 +0,0 @@
"""Dreaming mode dashboard routes.
GET /dreaming/api/status — JSON status of the dreaming engine
GET /dreaming/api/recent — JSON list of recent dream records
POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
GET /dreaming/partial — HTMX partial: dreaming status panel
"""
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.templating import templates
from timmy.dreaming import dreaming_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dreaming", tags=["dreaming"])
@router.get("/api/status", response_class=JSONResponse)
async def dreaming_status():
"""Return current dreaming engine status as JSON."""
return dreaming_engine.get_status()
@router.get("/api/recent", response_class=JSONResponse)
async def dreaming_recent(limit: int = 10):
"""Return recent dream records as JSON."""
dreams = dreaming_engine.get_recent_dreams(limit=limit)
return [
{
"id": d.id,
"session_excerpt": d.session_excerpt[:200],
"decision_point": d.decision_point[:200],
"simulation": d.simulation,
"proposed_rule": d.proposed_rule,
"created_at": d.created_at,
}
for d in dreams
]
@router.post("/api/trigger", response_class=JSONResponse)
async def dreaming_trigger():
"""Manually trigger a dream cycle (bypasses idle check).
Useful for testing and manual inspection. Forces idle state temporarily.
"""
from datetime import UTC, datetime, timedelta
from config import settings
# Temporarily back-date last activity to appear idle
original_time = dreaming_engine._last_activity_time
dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
minutes=settings.dreaming_idle_threshold_minutes + 1
)
try:
dream = await dreaming_engine.dream_once()
finally:
dreaming_engine._last_activity_time = original_time
if dream:
return {
"status": "ok",
"dream_id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
}
return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
@router.get("/partial", response_class=HTMLResponse)
async def dreaming_partial(request: Request):
"""HTMX partial: dreaming status panel for the dashboard."""
status = dreaming_engine.get_status()
recent = dreaming_engine.get_recent_dreams(limit=5)
return templates.TemplateResponse(
request,
"partials/dreaming_status.html",
{"status": status, "recent_dreams": recent},
)

View File

@@ -1,32 +0,0 @@
{% if not status.enabled %}
<div class="dream-disabled text-muted small">Dreaming mode disabled</div>
{% elif status.dreaming %}
<div class="dream-active">
<span class="dream-pulse"></span>
<span class="dream-label">DREAMING</span>
<div class="dream-summary">{{ status.current_summary }}</div>
</div>
{% elif status.idle %}
<div class="dream-idle">
<span class="dream-dot dream-dot-idle"></span>
<span class="dream-label-idle">IDLE</span>
<span class="dream-idle-meta">{{ status.idle_minutes }}m — dream cycle pending</span>
</div>
{% else %}
<div class="dream-standby">
<span class="dream-dot dream-dot-standby"></span>
<span class="dream-label-standby">STANDBY</span>
<span class="dream-idle-meta">idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m</span>
</div>
{% endif %}
{% if recent_dreams %}
<div class="dream-history mt-2">
{% for d in recent_dreams %}
<div class="dream-record">
<div class="dream-rule">{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}</div>
<div class="dream-meta">{{ d.created_at[:16] | replace("T", " ") }}</div>
</div>
{% endfor %}
</div>
{% endif %}

View File

@@ -2,6 +2,7 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .classifier import TaskComplexity, classify_task
from .history import HealthHistoryStore, get_history_store
__all__ = [
@@ -12,4 +13,6 @@ __all__ = [
"router",
"HealthHistoryStore",
"get_history_store",
"TaskComplexity",
"classify_task",
]

View File

@@ -528,6 +528,34 @@ class CascadeRouter:
return True
def _get_model_for_complexity(
self, provider: Provider, complexity: "TaskComplexity"
) -> str | None:
"""Return the best model on *provider* for the given complexity tier.
Checks fallback chains first (routine / complex), then falls back to
any model with the matching capability tag, then the provider default.
"""
from infrastructure.router.classifier import TaskComplexity
chain_key = "routine" if complexity == TaskComplexity.SIMPLE else "complex"
# Walk the capability fallback chain — first model present on this provider wins
for model_name in self.config.fallback_chains.get(chain_key, []):
if any(m["name"] == model_name for m in provider.models):
return model_name
# Direct capability lookup — only return if a model explicitly has the tag
# (do not use get_model_with_capability here as it falls back to the default)
cap_model = next(
(m["name"] for m in provider.models if chain_key in m.get("capabilities", [])),
None,
)
if cap_model:
return cap_model
return None # Caller will use provider default
async def complete(
self,
messages: list[dict],
@@ -535,6 +563,7 @@ class CascadeRouter:
temperature: float = 0.7,
max_tokens: int | None = None,
cascade_tier: str | None = None,
complexity_hint: str | None = None,
) -> dict:
"""Complete a chat conversation with automatic failover.
@@ -543,24 +572,48 @@ class CascadeRouter:
- Falls back to vision-capable models when needed
- Supports image URLs, paths, and base64 encoding
Complexity-based routing (issue #1065):
- ``complexity_hint="simple"`` → routes to Qwen3-8B (low-latency)
- ``complexity_hint="complex"`` → routes to Qwen3-14B (quality)
- ``complexity_hint=None`` (default) → auto-classifies from messages
Args:
messages: List of message dicts with role and content
model: Preferred model (tries this first, then provider defaults)
model: Preferred model (tries this first; complexity routing is
skipped when an explicit model is given)
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
cascade_tier: If specified, filters providers by this tier.
- "frontier_required": Uses only Anthropic provider for top-tier models.
complexity_hint: "simple", "complex", or None (auto-detect).
Returns:
Dict with content, provider_used, and metrics
Dict with content, provider_used, model, latency_ms,
is_fallback_model, and complexity fields.
Raises:
RuntimeError: If all providers fail
"""
from infrastructure.router.classifier import TaskComplexity, classify_task
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
# Resolve task complexity ─────────────────────────────────────────────
# Skip complexity routing when caller explicitly specifies a model.
complexity: TaskComplexity | None = None
if model is None:
if complexity_hint is not None:
try:
complexity = TaskComplexity(complexity_hint.lower())
except ValueError:
logger.warning("Unknown complexity_hint %r, auto-classifying", complexity_hint)
complexity = classify_task(messages)
else:
complexity = classify_task(messages)
logger.debug("Task complexity: %s", complexity.value)
errors = []
providers = self.providers
@@ -573,7 +626,6 @@ class CascadeRouter:
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
if not self._is_provider_available(provider):
continue
@@ -587,7 +639,21 @@ class CascadeRouter:
)
continue
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
# Complexity-based model selection (only when no explicit model) ──
effective_model = model
if effective_model is None and complexity is not None:
effective_model = self._get_model_for_complexity(provider, complexity)
if effective_model:
logger.debug(
"Complexity routing [%s]: %s%s",
complexity.value,
provider.name,
effective_model,
)
selected_model, is_fallback_model = self._select_model(
provider, effective_model, content_type
)
try:
result = await self._attempt_with_retry(
@@ -610,6 +676,7 @@ class CascadeRouter:
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
"complexity": complexity.value if complexity is not None else None,
}
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -0,0 +1,166 @@
"""Task complexity classifier for Qwen3 dual-model routing.
Classifies incoming tasks as SIMPLE (route to Qwen3-8B for low-latency)
or COMPLEX (route to Qwen3-14B for quality-sensitive work).
Classification is fully heuristic — no LLM inference required.
"""
import re
from enum import Enum
class TaskComplexity(Enum):
"""Task complexity tier for model routing."""
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
COMPLEX = "complex" # Qwen3-14B Q5_K_M: quality-sensitive, multi-step
# Keywords strongly associated with complex tasks
_COMPLEX_KEYWORDS: frozenset[str] = frozenset(
[
"plan",
"review",
"analyze",
"analyse",
"triage",
"refactor",
"design",
"architecture",
"implement",
"compare",
"debug",
"explain",
"prioritize",
"prioritise",
"strategy",
"optimize",
"optimise",
"evaluate",
"assess",
"brainstorm",
"outline",
"summarize",
"summarise",
"generate code",
"write a",
"write the",
"code review",
"pull request",
"multi-step",
"multi step",
"step by step",
"backlog prioriti",
"issue triage",
"root cause",
"how does",
"why does",
"what are the",
]
)
# Keywords strongly associated with simple/routine tasks
_SIMPLE_KEYWORDS: frozenset[str] = frozenset(
[
"status",
"list ",
"show ",
"what is",
"how many",
"ping",
"run ",
"execute ",
"ls ",
"cat ",
"ps ",
"fetch ",
"count ",
"tail ",
"head ",
"grep ",
"find file",
"read file",
"get ",
"query ",
"check ",
"yes",
"no",
"ok",
"done",
"thanks",
]
)
# Content longer than this is treated as complex regardless of keywords
_COMPLEX_CHAR_THRESHOLD = 500
# Short content defaults to simple
_SIMPLE_CHAR_THRESHOLD = 150
# More than this many messages suggests an ongoing complex conversation
_COMPLEX_CONVERSATION_DEPTH = 6
def classify_task(messages: list[dict]) -> TaskComplexity:
"""Classify task complexity from a list of messages.
Uses heuristic rules — no LLM call required. Errs toward COMPLEX
when uncertain so that quality is preserved.
Args:
messages: List of message dicts with ``role`` and ``content`` keys.
Returns:
TaskComplexity.SIMPLE or TaskComplexity.COMPLEX
"""
if not messages:
return TaskComplexity.SIMPLE
# Concatenate all user-turn content for analysis
user_content = " ".join(
msg.get("content", "")
for msg in messages
if msg.get("role") in ("user", "human")
and isinstance(msg.get("content"), str)
).lower().strip()
if not user_content:
return TaskComplexity.SIMPLE
# Complexity signals override everything -----------------------------------
# Explicit complex keywords
for kw in _COMPLEX_KEYWORDS:
if kw in user_content:
return TaskComplexity.COMPLEX
# Numbered / multi-step instruction list: "1. do this 2. do that"
if re.search(r"\b\d+\.\s+\w", user_content):
return TaskComplexity.COMPLEX
# Code blocks embedded in messages
if "```" in user_content:
return TaskComplexity.COMPLEX
# Long content → complex reasoning likely required
if len(user_content) > _COMPLEX_CHAR_THRESHOLD:
return TaskComplexity.COMPLEX
# Deep conversation → complex ongoing task
if len(messages) > _COMPLEX_CONVERSATION_DEPTH:
return TaskComplexity.COMPLEX
# Simplicity signals -------------------------------------------------------
# Explicit simple keywords
for kw in _SIMPLE_KEYWORDS:
if kw in user_content:
return TaskComplexity.SIMPLE
# Short single-sentence messages default to simple
if len(user_content) <= _SIMPLE_CHAR_THRESHOLD:
return TaskComplexity.SIMPLE
# When uncertain, prefer quality (complex model)
return TaskComplexity.COMPLEX

View File

@@ -1,434 +0,0 @@
"""Dreaming Mode — idle-time session replay and counterfactual simulation.
When the dashboard has been idle for a configurable period, this engine
selects a past chat session, identifies key agent response points, and
asks the LLM to simulate alternative approaches. Insights are stored as
proposed rules that can feed the auto-crystallizer or memory system.
Usage::
from timmy.dreaming import dreaming_engine
# Run one dream cycle (called by the background scheduler)
await dreaming_engine.dream_once()
# Query recent dreams
dreams = dreaming_engine.get_recent_dreams(limit=10)
# Get current status dict for API/dashboard
status = dreaming_engine.get_status()
"""
import logging
import re
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
_DEFAULT_DB = Path("data/dreams.db")
# Strip <think> tags from reasoning model output
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Minimum messages in a session to be worth replaying
_MIN_SESSION_MESSAGES = 3
# Gap in seconds between messages that signals a new session
_SESSION_GAP_SECONDS = 1800 # 30 minutes
@dataclass
class DreamRecord:
"""A single completed dream cycle."""
id: str
session_excerpt: str # Short excerpt from the replayed session
decision_point: str # The agent message that was re-simulated
simulation: str # The alternative response generated
proposed_rule: str # Rule extracted from the simulation
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS dreams (
id TEXT PRIMARY KEY,
session_excerpt TEXT NOT NULL,
decision_point TEXT NOT NULL,
simulation TEXT NOT NULL,
proposed_rule TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
conn.commit()
yield conn
def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
return DreamRecord(
id=row["id"],
session_excerpt=row["session_excerpt"],
decision_point=row["decision_point"],
simulation=row["simulation"],
proposed_rule=row["proposed_rule"],
created_at=row["created_at"],
)
class DreamingEngine:
"""Idle-time dreaming engine — replays sessions and simulates alternatives."""
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_activity_time: datetime = datetime.now(UTC)
self._is_dreaming: bool = False
self._current_dream_summary: str = ""
self._dreaming_agent = None # Lazy-initialised
# ── Public API ────────────────────────────────────────────────────────
def record_activity(self) -> None:
"""Reset the idle timer — call this on every user/agent interaction."""
self._last_activity_time = datetime.now(UTC)
def is_idle(self) -> bool:
"""Return True if the system has been idle long enough to start dreaming."""
threshold = settings.dreaming_idle_threshold_minutes
if threshold <= 0:
return False
return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
def get_status(self) -> dict[str, Any]:
"""Return a status dict suitable for API/dashboard consumption."""
return {
"enabled": settings.dreaming_enabled,
"dreaming": self._is_dreaming,
"idle": self.is_idle(),
"current_summary": self._current_dream_summary,
"idle_minutes": int(
(datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
),
"idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
"dream_count": self.count_dreams(),
}
async def dream_once(self) -> DreamRecord | None:
"""Execute one dream cycle.
Returns the stored DreamRecord, or None if the cycle was skipped
(not idle, dreaming disabled, no suitable session, or LLM error).
"""
if not settings.dreaming_enabled:
return None
if not self.is_idle():
logger.debug(
"Dreaming skipped — system active (idle for %d min, threshold %d min)",
int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
settings.dreaming_idle_threshold_minutes,
)
return None
if self._is_dreaming:
logger.debug("Dreaming skipped — cycle already in progress")
return None
self._is_dreaming = True
self._current_dream_summary = "Selecting a past session…"
await self._broadcast_status()
try:
return await self._run_dream_cycle()
except Exception as exc:
logger.warning("Dream cycle failed: %s", exc)
return None
finally:
self._is_dreaming = False
self._current_dream_summary = ""
await self._broadcast_status()
def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
"""Retrieve the most recent dream records."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_dream(r) for r in rows]
def count_dreams(self) -> int:
"""Return total number of stored dream records."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
return row["c"] if row else 0
# ── Private helpers ───────────────────────────────────────────────────
async def _run_dream_cycle(self) -> DreamRecord | None:
"""Core dream logic: select → simulate → store."""
# 1. Select a past session from the chat log
session = await self._select_session()
if not session:
logger.debug("No suitable chat session found for dreaming")
self._current_dream_summary = "No past sessions to replay"
return None
decision_point, session_excerpt = session
self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}"
await self._broadcast_status()
# 2. Simulate an alternative response
simulation = await self._simulate_alternative(decision_point, session_excerpt)
if not simulation:
logger.debug("Dream simulation produced no output")
return None
# 3. Extract a proposed rule
proposed_rule = await self._extract_rule(decision_point, simulation)
# 4. Store and broadcast
dream = self._store_dream(
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
)
self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
logger.info(
"Dream [%s]: replayed session, proposed rule: %s",
dream.id[:8],
proposed_rule[:80] if proposed_rule else "(none)",
)
await self._broadcast_status()
await self._broadcast_dream(dream)
return dream
async def _select_session(self) -> tuple[str, str] | None:
"""Select a past chat session and return (decision_point, session_excerpt).
Uses the SQLite chat store. Groups messages into sessions by time
gap. Picks a random session with enough messages, then selects one
agent response as the decision point.
"""
try:
from infrastructure.chat_store import DB_PATH
if not DB_PATH.exists():
return None
import asyncio
rows = await asyncio.to_thread(self._load_chat_rows)
if not rows:
return None
sessions = self._group_into_sessions(rows)
if not sessions:
return None
# Filter sessions with enough messages
valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
if not valid:
return None
import random
session = random.choice(valid) # noqa: S311 (not cryptographic)
# Build a short text excerpt (last N messages)
excerpt_msgs = session[-6:]
excerpt = "\n".join(
f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
)
# Find agent responses as candidate decision points
agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
if not agent_msgs:
return None
decision = random.choice(agent_msgs) # noqa: S311
return decision["content"], excerpt
except Exception as exc:
logger.warning("Session selection failed: %s", exc)
return None
def _load_chat_rows(self) -> list[dict]:
"""Synchronously load chat messages from SQLite."""
from infrastructure.chat_store import DB_PATH
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT role, content, timestamp FROM chat_messages "
"ORDER BY timestamp ASC"
).fetchall()
return [dict(r) for r in rows]
def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
"""Group chat rows into sessions based on time gaps."""
if not rows:
return []
sessions: list[list[dict]] = []
current: list[dict] = [rows[0]]
for prev, curr in zip(rows, rows[1:], strict=False):
try:
t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
gap = (t_curr - t_prev).total_seconds()
except Exception:
gap = 0
if gap > _SESSION_GAP_SECONDS:
sessions.append(current)
current = [curr]
else:
current.append(curr)
sessions.append(current)
return sessions
async def _simulate_alternative(
self, decision_point: str, session_excerpt: str
) -> str:
"""Ask the LLM to simulate an alternative response."""
prompt = (
"You are Timmy, a sovereign AI agent in a dreaming state.\n"
"You are replaying a past conversation and exploring what you could "
"have done differently at a key decision point.\n\n"
"PAST SESSION EXCERPT:\n"
f"{session_excerpt}\n\n"
"KEY DECISION POINT (your past response):\n"
f"{decision_point[:500]}\n\n"
"TASK: In 2-3 sentences, describe ONE concrete alternative approach "
"you could have taken at this decision point that would have been "
"more helpful, more accurate, or more efficient.\n"
"Be specific — reference the actual content of the conversation.\n"
"Do NOT include meta-commentary about dreaming or this exercise.\n\n"
"Alternative approach:"
)
raw = await self._call_agent(prompt)
return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
async def _extract_rule(self, decision_point: str, simulation: str) -> str:
"""Extract a proposed behaviour rule from the simulation."""
prompt = (
"Given this pair of agent responses:\n\n"
f"ORIGINAL: {decision_point[:300]}\n\n"
f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
"Extract ONE concise rule (max 20 words) that captures what to do "
"differently next time. Format: 'When X, do Y instead of Z.'\n"
"Rule:"
)
raw = await self._call_agent(prompt)
rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
# Keep only the first sentence/line
rule = rule.split("\n")[0].strip().rstrip(".")
return rule[:200] # Safety cap
async def _call_agent(self, prompt: str) -> str:
"""Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
import asyncio
if self._dreaming_agent is None:
from timmy.agent import create_timmy
self._dreaming_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(settings.dreaming_timeout_seconds):
run = await self._dreaming_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
return ""
except Exception as exc:
logger.warning("Dreaming LLM call failed: %s", exc)
return ""
raw = run.content if hasattr(run, "content") else str(run)
return raw or ""
def _store_dream(
self,
*,
session_excerpt: str,
decision_point: str,
simulation: str,
proposed_rule: str,
) -> DreamRecord:
dream = DreamRecord(
id=str(uuid.uuid4()),
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
created_at=datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO dreams
(id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
dream.id,
dream.session_excerpt,
dream.decision_point,
dream.simulation,
dream.proposed_rule,
dream.created_at,
),
)
conn.commit()
return dream
async def _broadcast_status(self) -> None:
"""Push current dreaming status via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("dreaming_state", self.get_status())
except Exception as exc:
logger.debug("Dreaming status broadcast failed: %s", exc)
async def _broadcast_dream(self, dream: DreamRecord) -> None:
"""Push a completed dream record via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"dreaming_complete",
{
"id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
"created_at": dream.created_at,
},
)
except Exception as exc:
logger.debug("Dreaming complete broadcast failed: %s", exc)
# Module-level singleton
dreaming_engine = DreamingEngine()

View File

@@ -2547,44 +2547,3 @@
.tower-adv-title { font-size: 0.85rem; font-weight: 600; color: var(--text-bright); }
.tower-adv-detail { font-size: 0.8rem; color: var(--text); margin-top: 2px; }
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
/* ═══════════════════════════════════════════════════════════════
Dreaming Mode
═══════════════════════════════════════════════════════════════ */
.dream-active {
display: flex; align-items: center; gap: 8px;
padding: 6px 0;
}
.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
.dream-pulse {
display: inline-block; width: 8px; height: 8px; border-radius: 50%;
background: var(--purple);
animation: dream-pulse 1.8s ease-in-out infinite;
}
@keyframes dream-pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.4; transform: scale(0.7); }
}
.dream-dot {
display: inline-block; width: 7px; height: 7px; border-radius: 50%;
}
.dream-dot-idle { background: var(--amber); }
.dream-dot-standby { background: var(--text-dim); }
.dream-idle, .dream-standby {
display: flex; align-items: center; gap: 6px; padding: 4px 0;
}
.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
.dream-record:last-child { border-bottom: none; }
.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }

View File

@@ -968,3 +968,195 @@ class TestCascadeRouterReload:
assert router.providers[0].name == "low-priority"
assert router.providers[1].name == "high-priority"
class TestComplexityRouting:
"""Tests for Qwen3-8B / Qwen3-14B dual-model routing (issue #1065)."""
def _make_dual_model_provider(self) -> Provider:
"""Build an Ollama provider with both Qwen3 models registered."""
return Provider(
name="ollama-local",
type="ollama",
enabled=True,
priority=1,
url="http://localhost:11434",
models=[
{
"name": "qwen3:8b",
"capabilities": ["text", "tools", "json", "streaming", "routine"],
},
{
"name": "qwen3:14b",
"default": True,
"capabilities": ["text", "tools", "json", "streaming", "complex", "reasoning"],
},
],
)
def test_get_model_for_complexity_simple_returns_8b(self):
"""Simple tasks should select the model with 'routine' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model == "qwen3:8b"
def test_get_model_for_complexity_complex_returns_14b(self):
"""Complex tasks should select the model with 'complex' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.COMPLEX)
assert model == "qwen3:14b"
def test_get_model_for_complexity_returns_none_when_no_match(self):
"""Returns None when provider has no matching model in chain."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {} # empty chains
provider = Provider(
name="test",
type="ollama",
enabled=True,
priority=1,
models=[{"name": "llama3.2:3b", "default": True, "capabilities": ["text"]}],
)
# No 'routine' or 'complex' model available
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model is None
@pytest.mark.asyncio
async def test_complete_with_simple_hint_routes_to_8b(self):
"""complexity_hint='simple' should use qwen3:8b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "fast answer", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
complexity_hint="simple",
)
assert result["model"] == "qwen3:8b"
assert result["complexity"] == "simple"
@pytest.mark.asyncio
async def test_complete_with_complex_hint_routes_to_14b(self):
"""complexity_hint='complex' should use qwen3:14b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "detailed answer", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "review this PR"}],
complexity_hint="complex",
)
assert result["model"] == "qwen3:14b"
assert result["complexity"] == "complex"
@pytest.mark.asyncio
async def test_explicit_model_bypasses_complexity_routing(self):
"""When model is explicitly provided, complexity routing is skipped."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "response", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
model="qwen3:14b", # explicit override
)
# Explicit model wins — complexity field is None
assert result["model"] == "qwen3:14b"
assert result["complexity"] is None
@pytest.mark.asyncio
async def test_auto_classification_routes_simple_message(self):
"""Short, simple messages should auto-classify as SIMPLE → 8B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
# no complexity_hint — auto-classify
)
assert result["complexity"] == "simple"
assert result["model"] == "qwen3:8b"
@pytest.mark.asyncio
async def test_auto_classification_routes_complex_message(self):
"""Complex messages should auto-classify → 14B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "deep analysis", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "analyze and prioritize the backlog"}],
)
assert result["complexity"] == "complex"
assert result["model"] == "qwen3:14b"
@pytest.mark.asyncio
async def test_invalid_complexity_hint_falls_back_to_auto(self):
"""Invalid complexity_hint should log a warning and auto-classify."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
# Should not raise
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
complexity_hint="INVALID_HINT",
)
assert result["complexity"] in ("simple", "complex") # auto-classified

View File

@@ -0,0 +1,134 @@
"""Tests for Qwen3 dual-model task complexity classifier."""
import pytest
from infrastructure.router.classifier import TaskComplexity, classify_task
class TestClassifyTask:
"""Tests for classify_task heuristics."""
# ── Simple / routine tasks ──────────────────────────────────────────────
def test_empty_messages_is_simple(self):
assert classify_task([]) == TaskComplexity.SIMPLE
def test_no_user_content_is_simple(self):
messages = [{"role": "system", "content": "You are Timmy."}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_status_query_is_simple(self):
messages = [{"role": "user", "content": "status"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_list_command_is_simple(self):
messages = [{"role": "user", "content": "list all tasks"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_get_command_is_simple(self):
messages = [{"role": "user", "content": "get the latest log entry"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_message_under_threshold_is_simple(self):
messages = [{"role": "user", "content": "run the build"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_affirmation_is_simple(self):
messages = [{"role": "user", "content": "yes"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
# ── Complex / quality-sensitive tasks ──────────────────────────────────
def test_plan_keyword_is_complex(self):
messages = [{"role": "user", "content": "plan the sprint"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_review_keyword_is_complex(self):
messages = [{"role": "user", "content": "review this code"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyze_keyword_is_complex(self):
messages = [{"role": "user", "content": "analyze performance"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_triage_keyword_is_complex(self):
messages = [{"role": "user", "content": "triage the open issues"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_refactor_keyword_is_complex(self):
messages = [{"role": "user", "content": "refactor the auth module"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_explain_keyword_is_complex(self):
messages = [{"role": "user", "content": "explain how the router works"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_prioritize_keyword_is_complex(self):
messages = [{"role": "user", "content": "prioritize the backlog"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_long_message_is_complex(self):
long_msg = "do something " * 50 # > 500 chars
messages = [{"role": "user", "content": long_msg}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_numbered_list_is_complex(self):
messages = [
{
"role": "user",
"content": "1. Read the file 2. Analyze it 3. Write a report",
}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_code_block_is_complex(self):
messages = [
{"role": "user", "content": "Here is the code:\n```python\nprint('hello')\n```"}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_deep_conversation_is_complex(self):
messages = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "now do the thing"},
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyse_british_spelling_is_complex(self):
messages = [{"role": "user", "content": "analyse this dataset"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_non_string_content_is_ignored(self):
"""Non-string content should not crash the classifier."""
messages = [{"role": "user", "content": ["part1", "part2"]}]
# Should not raise; result doesn't matter — just must not blow up
result = classify_task(messages)
assert isinstance(result, TaskComplexity)
def test_system_message_not_counted_as_user(self):
"""System message alone should not trigger complex keywords."""
messages = [
{"role": "system", "content": "analyze everything carefully"},
{"role": "user", "content": "yes"},
]
# "analyze" is in system message (not user) — user says "yes" → simple
assert classify_task(messages) == TaskComplexity.SIMPLE
class TestTaskComplexityEnum:
"""Tests for TaskComplexity enum values."""
def test_simple_value(self):
assert TaskComplexity.SIMPLE.value == "simple"
def test_complex_value(self):
assert TaskComplexity.COMPLEX.value == "complex"
def test_lookup_by_value(self):
assert TaskComplexity("simple") == TaskComplexity.SIMPLE
assert TaskComplexity("complex") == TaskComplexity.COMPLEX

View File

@@ -1,217 +0,0 @@
"""Unit tests for the Dreaming mode engine."""
import sqlite3
from contextlib import closing
from datetime import UTC, datetime, timedelta
from unittest.mock import AsyncMock, patch
import pytest
from timmy.dreaming import _SESSION_GAP_SECONDS, DreamingEngine, DreamRecord
pytestmark = pytest.mark.unit
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def tmp_dreams_db(tmp_path):
"""Return a temporary path for the dreams database."""
return tmp_path / "dreams.db"
@pytest.fixture()
def engine(tmp_dreams_db):
"""DreamingEngine backed by a temp database."""
return DreamingEngine(db_path=tmp_dreams_db)
@pytest.fixture()
def chat_db(tmp_path):
"""Create a minimal chat database with some messages."""
db_path = tmp_path / "chat.db"
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'browser'
)
""")
now = datetime.now(UTC)
messages = [
("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
]
conn.executemany(
"INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
messages,
)
conn.commit()
return db_path
# ── Idle detection ─────────────────────────────────────────────────────────────
class TestIdleDetection:
def test_not_idle_immediately(self, engine):
assert engine.is_idle() is False
def test_idle_after_threshold(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is True
def test_not_idle_when_threshold_zero(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 0
assert engine.is_idle() is False
def test_record_activity_resets_timer(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
engine.record_activity()
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is False
# ── Status dict ───────────────────────────────────────────────────────────────
class TestGetStatus:
def test_status_shape(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
status = engine.get_status()
assert "enabled" in status
assert "dreaming" in status
assert "idle" in status
assert "dream_count" in status
assert "idle_minutes" in status
def test_dream_count_starts_at_zero(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.get_status()["dream_count"] == 0
# ── Session grouping ──────────────────────────────────────────────────────────
class TestGroupIntoSessions:
def test_single_session(self, engine):
now = datetime.now(UTC)
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 1
assert len(sessions[0]) == 2
def test_splits_on_large_gap(self, engine):
now = datetime.now(UTC)
gap = _SESSION_GAP_SECONDS + 100
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 2
def test_empty_input(self, engine):
assert engine._group_into_sessions([]) == []
# ── Dream storage ─────────────────────────────────────────────────────────────
class TestDreamStorage:
def test_store_and_retrieve(self, engine):
dream = engine._store_dream(
session_excerpt="User asked about Python.",
decision_point="Python uses try/except blocks.",
simulation="I could have given a code example.",
proposed_rule="When explaining errors, include a code snippet.",
)
assert dream.id
assert dream.proposed_rule == "When explaining errors, include a code snippet."
retrieved = engine.get_recent_dreams(limit=1)
assert len(retrieved) == 1
assert retrieved[0].id == dream.id
def test_count_increments(self, engine):
assert engine.count_dreams() == 0
engine._store_dream(
session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
)
assert engine.count_dreams() == 1
# ── dream_once integration ─────────────────────────────────────────────────────
class TestDreamOnce:
@pytest.mark.asyncio
async def test_skips_when_disabled(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = False
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_not_idle(self, engine):
engine._last_activity_time = datetime.now(UTC)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 60
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_already_dreaming(self, engine):
engine._is_dreaming = True
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 0
result = await engine.dream_once()
# Reset for cleanliness
engine._is_dreaming = False
assert result is None
@pytest.mark.asyncio
async def test_dream_produces_record_when_idle(self, engine, chat_db):
"""Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
with (
patch("timmy.dreaming.settings") as mock_settings,
patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
patch("infrastructure.chat_store.DB_PATH", chat_db),
):
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
mock_settings.dreaming_timeout_seconds = 30
mock_agent.side_effect = [
"I could have provided a concrete try/except example.", # simulation
"When explaining errors, always include a runnable code snippet.", # rule
]
result = await engine.dream_once()
assert result is not None
assert isinstance(result, DreamRecord)
assert result.simulation
assert result.proposed_rule
assert engine.count_dreams() == 1