-
-
- Loading journal...
-
+
+
+
No modifications recorded yet.
+
Self-coding tasks will appear here when executed.
diff --git a/src/integrations/chat_bridge/vendors/discord.py b/src/integrations/chat_bridge/vendors/discord.py
index de155d74..a77b4cd9 100644
--- a/src/integrations/chat_bridge/vendors/discord.py
+++ b/src/integrations/chat_bridge/vendors/discord.py
@@ -355,25 +355,39 @@ class DiscordVendor(ChatPlatform):
else:
session_id = f"discord_{message.channel.id}"
- # Run Timmy agent (singleton, with session continuity)
+ # Run Timmy agent with typing indicator and timeout
+ response = None
try:
agent = _get_discord_agent()
- run = await asyncio.to_thread(
- agent.run, content, stream=False, session_id=session_id
- )
+
+ # Show typing indicator while the agent processes
+ async with target.typing():
+ run = await asyncio.wait_for(
+ asyncio.to_thread(
+ agent.run, content, stream=False, session_id=session_id
+ ),
+ timeout=300,
+ )
response = run.content if hasattr(run, "content") else str(run)
+ except asyncio.TimeoutError:
+ logger.error("Discord: agent.run() timed out after 300s")
+ response = "Sorry, that took too long. Please try a simpler request."
except Exception as exc:
- logger.error("Timmy error in Discord handler: %s", exc)
- response = f"Timmy is offline: {exc}"
+ logger.error("Discord: agent.run() failed: %s", exc)
+ response = "I'm having trouble reaching my language model right now. Please try again shortly."
# Strip hallucinated tool-call JSON and chain-of-thought narration
from timmy.session import _clean_response
response = _clean_response(response)
- # Discord has a 2000 character limit
+ # Discord has a 2000 character limit — send with error handling
for chunk in _chunk_message(response, 2000):
- await target.send(chunk)
+ try:
+ await target.send(chunk)
+ except Exception as exc:
+ logger.error("Discord: failed to send message chunk: %s", exc)
+ break
async def _get_or_create_thread(self, message):
"""Get the active thread for a channel, or create one.
diff --git a/src/timmy/agentic_loop.py b/src/timmy/agentic_loop.py
new file mode 100644
index 00000000..826884d4
--- /dev/null
+++ b/src/timmy/agentic_loop.py
@@ -0,0 +1,305 @@
+"""Agentic loop — multi-step task execution with progress tracking.
+
+Provides `run_agentic_loop()`, the engine behind the `plan_and_execute` tool.
+When the model recognises a task needs 3+ sequential steps, it calls
+`plan_and_execute(task)` which spawns this loop in the background.
+
+Flow:
+ 1. Planning — ask the model to break the task into numbered steps
+ 2. Execution — run each step sequentially, feeding results forward
+ 3. Adaptation — on failure, ask the model to adapt the plan
+ 4. Summary — ask the model to summarise what was accomplished
+
+Progress is broadcast via WebSocket so the dashboard can show live updates.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import re
+import time
+import uuid
+from dataclasses import dataclass, field
+from typing import Callable, Optional
+
+logger = logging.getLogger(__name__)
+
+
+# ---------------------------------------------------------------------------
+# Data structures
+# ---------------------------------------------------------------------------
+
+@dataclass
+class AgenticStep:
+ """Result of a single step in the agentic loop."""
+ step_num: int
+ description: str
+ result: str
+ status: str # "completed" | "failed" | "adapted"
+ duration_ms: int
+
+
+@dataclass
+class AgenticResult:
+ """Final result of the entire agentic loop."""
+ task_id: str
+ task: str
+ summary: str
+ steps: list[AgenticStep] = field(default_factory=list)
+ status: str = "completed" # "completed" | "partial" | "failed"
+ total_duration_ms: int = 0
+
+
+# ---------------------------------------------------------------------------
+# Agent factory
+# ---------------------------------------------------------------------------
+
+def _get_loop_agent():
+ """Create a fresh agent for the agentic loop.
+
+ Returns the same type of agent as `create_timmy()` but with a
+ dedicated session so it doesn't pollute the main chat history.
+ """
+ from timmy.agent import create_timmy
+ return create_timmy()
+
+
+# ---------------------------------------------------------------------------
+# Plan parser
+# ---------------------------------------------------------------------------
+
+_STEP_RE = re.compile(r"^\s*(\d+)[.)]\s*(.+)$", re.MULTILINE)
+
+
+def _parse_steps(plan_text: str) -> list[str]:
+ """Extract numbered steps from the model's planning output."""
+ matches = _STEP_RE.findall(plan_text)
+ if matches:
+ return [desc.strip() for _, desc in matches]
+ # Fallback: split on newlines, ignore blanks
+ return [line.strip() for line in plan_text.strip().splitlines() if line.strip()]
+
+
+# ---------------------------------------------------------------------------
+# Core loop
+# ---------------------------------------------------------------------------
+
+async def run_agentic_loop(
+ task: str,
+ *,
+ session_id: str = "agentic",
+ max_steps: int = 0,
+ on_progress: Optional[Callable] = None,
+) -> AgenticResult:
+ """Execute a multi-step task with planning, execution, and adaptation.
+
+ Args:
+ task: Full description of the task to execute.
+ session_id: Agno session_id for conversation continuity.
+ max_steps: Max steps to execute (0 = use config default).
+ on_progress: Optional async callback(description, step_num, total_steps).
+
+ Returns:
+ AgenticResult with steps, summary, and status.
+ """
+ from config import settings
+
+ if max_steps <= 0:
+ max_steps = getattr(settings, "max_agent_steps", 10)
+
+ task_id = str(uuid.uuid4())[:8]
+ start_time = time.monotonic()
+
+ agent = _get_loop_agent()
+ result = AgenticResult(task_id=task_id, task=task, summary="")
+
+ # ── Phase 1: Planning ──────────────────────────────────────────────────
+ plan_prompt = (
+ f"Break this task into numbered steps (max {max_steps}). "
+ f"Return ONLY a numbered list, nothing else.\n\n"
+ f"Task: {task}"
+ )
+ try:
+ plan_run = await asyncio.to_thread(
+ agent.run, plan_prompt, stream=False, session_id=f"{session_id}_plan"
+ )
+ plan_text = plan_run.content if hasattr(plan_run, "content") else str(plan_run)
+ except Exception as exc:
+ logger.error("Agentic loop: planning failed: %s", exc)
+ result.status = "failed"
+ result.summary = f"Planning failed: {exc}"
+ result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
+ return result
+
+ steps = _parse_steps(plan_text)
+ if not steps:
+ result.status = "failed"
+ result.summary = "Planning produced no steps."
+ result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
+ return result
+
+ # Enforce max_steps — track if we truncated
+ planned_steps = len(steps)
+ steps = steps[:max_steps]
+ total_steps = len(steps)
+ was_truncated = planned_steps > total_steps
+
+ # Broadcast plan
+ await _broadcast_progress("agentic.plan_ready", {
+ "task_id": task_id,
+ "task": task,
+ "steps": steps,
+ "total": total_steps,
+ })
+
+ # ── Phase 2: Execution ─────────────────────────────────────────────────
+ completed_results: list[str] = []
+
+ for i, step_desc in enumerate(steps, 1):
+ step_start = time.monotonic()
+
+ context = (
+ f"Task: {task}\n"
+ f"Plan: {plan_text}\n"
+ f"Completed so far: {completed_results}\n\n"
+ f"Now do step {i}: {step_desc}\n"
+ f"Execute this step and report what you did."
+ )
+
+ try:
+ step_run = await asyncio.to_thread(
+ agent.run, context, stream=False, session_id=f"{session_id}_step{i}"
+ )
+ step_result = step_run.content if hasattr(step_run, "content") else str(step_run)
+
+ # Clean the response
+ from timmy.session import _clean_response
+ step_result = _clean_response(step_result)
+
+ step = AgenticStep(
+ step_num=i,
+ description=step_desc,
+ result=step_result,
+ status="completed",
+ duration_ms=int((time.monotonic() - step_start) * 1000),
+ )
+ result.steps.append(step)
+ completed_results.append(f"Step {i}: {step_result[:200]}")
+
+ # Broadcast progress
+ await _broadcast_progress("agentic.step_complete", {
+ "task_id": task_id,
+ "step": i,
+ "total": total_steps,
+ "description": step_desc,
+ "result": step_result[:200],
+ })
+
+ if on_progress:
+ await on_progress(step_desc, i, total_steps)
+
+ except Exception as exc:
+ logger.warning("Agentic loop step %d failed: %s", i, exc)
+
+ # ── Adaptation: ask model to adapt ─────────────────────────────
+ adapt_prompt = (
+ f"Step {i} failed with error: {exc}\n"
+ f"Original step was: {step_desc}\n"
+ f"Adapt the plan and try an alternative approach for this step."
+ )
+ try:
+ adapt_run = await asyncio.to_thread(
+ agent.run, adapt_prompt, stream=False,
+ session_id=f"{session_id}_adapt{i}",
+ )
+ adapt_result = adapt_run.content if hasattr(adapt_run, "content") else str(adapt_run)
+ from timmy.session import _clean_response
+ adapt_result = _clean_response(adapt_result)
+
+ step = AgenticStep(
+ step_num=i,
+ description=f"[Adapted] {step_desc}",
+ result=adapt_result,
+ status="adapted",
+ duration_ms=int((time.monotonic() - step_start) * 1000),
+ )
+ result.steps.append(step)
+ completed_results.append(f"Step {i} (adapted): {adapt_result[:200]}")
+
+ await _broadcast_progress("agentic.step_adapted", {
+ "task_id": task_id,
+ "step": i,
+ "total": total_steps,
+ "description": step_desc,
+ "error": str(exc),
+ "adaptation": adapt_result[:200],
+ })
+
+ if on_progress:
+ await on_progress(f"[Adapted] {step_desc}", i, total_steps)
+
+ except Exception as adapt_exc:
+ logger.error("Agentic loop adaptation also failed: %s", adapt_exc)
+ step = AgenticStep(
+ step_num=i,
+ description=step_desc,
+ result=f"Failed: {exc}; Adaptation also failed: {adapt_exc}",
+ status="failed",
+ duration_ms=int((time.monotonic() - step_start) * 1000),
+ )
+ result.steps.append(step)
+ completed_results.append(f"Step {i}: FAILED")
+
+ # ── Phase 3: Summary ───────────────────────────────────────────────────
+ summary_prompt = (
+ f"Task: {task}\n"
+ f"Results:\n" + "\n".join(completed_results) + "\n\n"
+ f"Summarise what was accomplished in 2-3 sentences."
+ )
+ try:
+ summary_run = await asyncio.to_thread(
+ agent.run, summary_prompt, stream=False,
+ session_id=f"{session_id}_summary",
+ )
+ result.summary = summary_run.content if hasattr(summary_run, "content") else str(summary_run)
+ from timmy.session import _clean_response
+ result.summary = _clean_response(result.summary)
+ except Exception as exc:
+ logger.error("Agentic loop summary failed: %s", exc)
+ result.summary = f"Completed {len(result.steps)} steps."
+
+ # Determine final status
+ if was_truncated:
+ result.status = "partial"
+ elif len(result.steps) < total_steps:
+ result.status = "partial"
+ elif any(s.status == "failed" for s in result.steps):
+ result.status = "partial"
+ else:
+ result.status = "completed"
+
+ result.total_duration_ms = int((time.monotonic() - start_time) * 1000)
+
+ await _broadcast_progress("agentic.task_complete", {
+ "task_id": task_id,
+ "status": result.status,
+ "steps_completed": len(result.steps),
+ "summary": result.summary[:300],
+ "duration_ms": result.total_duration_ms,
+ })
+
+ return result
+
+
+# ---------------------------------------------------------------------------
+# WebSocket broadcast helper
+# ---------------------------------------------------------------------------
+
+async def _broadcast_progress(event: str, data: dict) -> None:
+ """Broadcast agentic loop progress via WebSocket (best-effort)."""
+ try:
+ from infrastructure.ws_manager.handler import ws_manager
+ await ws_manager.broadcast(event, data)
+ except Exception:
+ logger.debug("Agentic loop: WS broadcast failed for %s", event)
diff --git a/src/timmy/briefing.py b/src/timmy/briefing.py
index f661ee00..11c86455 100644
--- a/src/timmy/briefing.py
+++ b/src/timmy/briefing.py
@@ -299,7 +299,12 @@ class BriefingEngine:
from timmy.agent import create_timmy
agent = create_timmy()
run = agent.run(prompt, stream=False)
- return run.content if hasattr(run, "content") else str(run)
+ result = run.content if hasattr(run, "content") else str(run)
+ # Ensure we always return an actual string (guards against
+ # MagicMock objects when agno is stubbed in tests).
+ if not isinstance(result, str):
+ return str(result)
+ return result
except Exception as exc:
logger.warning("Agent call failed during briefing generation: %s", exc)
return (
diff --git a/src/timmy/conversation.py b/src/timmy/conversation.py
index 5297cb86..dbca651c 100644
--- a/src/timmy/conversation.py
+++ b/src/timmy/conversation.py
@@ -62,10 +62,25 @@ class ConversationManager:
if session_id in self._contexts:
del self._contexts[session_id]
+ # Words that look like names but are actually verbs/UI states
+ _NAME_BLOCKLIST = frozenset({
+ "sending", "loading", "pending", "processing", "typing",
+ "working", "going", "trying", "looking", "getting", "doing",
+ "waiting", "running", "checking", "coming", "leaving",
+ "thinking", "reading", "writing", "watching", "listening",
+ "playing", "eating", "sleeping", "sitting", "standing",
+ "walking", "talking", "asking", "telling", "feeling",
+ "hoping", "wondering", "glad", "happy", "sorry", "sure",
+ "fine", "good", "great", "okay", "here", "there", "back",
+ "done", "ready", "busy", "free", "available", "interested",
+ "confused", "lost", "stuck", "curious", "excited", "tired",
+ "not", "also", "just", "still", "already", "currently",
+ })
+
def extract_user_name(self, message: str) -> Optional[str]:
"""Try to extract user's name from message."""
message_lower = message.lower()
-
+
# Common patterns
patterns = [
"my name is ",
@@ -73,16 +88,23 @@ class ConversationManager:
"i am ",
"call me ",
]
-
+
for pattern in patterns:
if pattern in message_lower:
idx = message_lower.find(pattern) + len(pattern)
remainder = message[idx:].strip()
+ if not remainder:
+ continue
# Take first word as name
name = remainder.split()[0].strip(".,!?;:")
+ if not name:
+ continue
+ # Reject common verbs, adjectives, and UI-state words
+ if name.lower() in self._NAME_BLOCKLIST:
+ continue
# Capitalize first letter
return name.capitalize()
-
+
return None
def should_use_tools(self, message: str, context: ConversationContext) -> bool:
diff --git a/src/timmy/prompts.py b/src/timmy/prompts.py
index 535814fc..d037c6b1 100644
--- a/src/timmy/prompts.py
+++ b/src/timmy/prompts.py
@@ -79,6 +79,22 @@ When faced with uncertainty, complexity, or ambiguous requests:
- **shell** — System operations (explicit user request)
- **memory_search** — Finding past context
+## Multi-Step Task Execution
+
+When a task requires multiple tool calls:
+1. Call the first tool and wait for results
+2. Evaluate: is the task complete? If not, call the next tool
+3. Continue until the task is fully done
+4. If a tool fails, try an alternative approach
+5. Summarize what you accomplished at the end
+
+IMPORTANT: Do NOT stop after one tool call unless the task is truly complete.
+If you used web_search and the user also asked you to write results to a file,
+call write_file next — don't just report the search results.
+
+For complex tasks with 3+ steps that may take time, use the plan_and_execute
+tool to run them in the background with progress tracking.
+
## Important: Response Style
- Never narrate your reasoning process. Just give the answer.
diff --git a/src/timmy/session.py b/src/timmy/session.py
index 7d5bd831..fea1bb8b 100644
--- a/src/timmy/session.py
+++ b/src/timmy/session.py
@@ -77,8 +77,12 @@ def chat(message: str, session_id: Optional[str] = None) -> str:
_extract_facts(message)
# Run with session_id so Agno retrieves history from SQLite
- run = agent.run(message, stream=False, session_id=sid)
- response_text = run.content if hasattr(run, "content") else str(run)
+ try:
+ run = agent.run(message, stream=False, session_id=sid)
+ response_text = run.content if hasattr(run, "content") else str(run)
+ except Exception as exc:
+ logger.error("Session: agent.run() failed: %s", exc)
+ return "I'm having trouble reaching my language model right now. Please try again shortly."
# Post-processing: clean up any leaked tool calls or chain-of-thought
response_text = _clean_response(response_text)
@@ -130,6 +134,10 @@ def _clean_response(text: str) -> str:
if not text:
return text
+ # Convert literal \n escape sequences to actual newlines
+ # (models sometimes output these in tool-result text)
+ text = text.replace("\\n", "\n")
+
# Strip JSON tool call blocks
text = _TOOL_CALL_JSON.sub("", text)
diff --git a/src/timmy/tools.py b/src/timmy/tools.py
index 16e22da1..b7222c29 100644
--- a/src/timmy/tools.py
+++ b/src/timmy/tools.py
@@ -455,6 +455,51 @@ def create_full_toolkit(base_dir: str | Path | None = None):
except Exception:
logger.debug("Memory tools not available")
+ # Agentic loop — background multi-step task execution
+ try:
+ from timmy.agentic_loop import run_agentic_loop
+
+ def plan_and_execute(task: str) -> str:
+ """Execute a complex multi-step task in the background with progress tracking.
+
+ Use this when a task requires 3 or more sequential tool calls that may
+ take significant time. The task will run in the background and stream
+ progress updates to the user via WebSocket.
+
+ Args:
+ task: Full description of the multi-step task to execute.
+
+ Returns:
+ Task ID and confirmation that background execution has started.
+ """
+ import asyncio
+ task_id = None
+
+ async def _launch():
+ nonlocal task_id
+ result = await run_agentic_loop(task)
+ return result
+
+ # Spawn as a background task on the running event loop
+ try:
+ loop = asyncio.get_running_loop()
+ future = asyncio.ensure_future(_launch())
+ task_id = id(future)
+ logger.info("Agentic loop started (task=%s)", task[:80])
+ except RuntimeError:
+ # No running loop — run synchronously (shouldn't happen in prod)
+ result = asyncio.run(_launch())
+ return f"Task completed: {result.summary}"
+
+ return (
+ f"Background task started. I'll execute this step-by-step "
+ f"and stream progress updates. You can monitor via the dashboard."
+ )
+
+ toolkit.register(plan_and_execute, name="plan_and_execute")
+ except Exception:
+ logger.debug("plan_and_execute tool not available")
+
# System introspection - query runtime environment (sovereign self-knowledge)
try:
from timmy.tools_intro import (
diff --git a/src/timmy/tools_intro/__init__.py b/src/timmy/tools_intro/__init__.py
index 07ec0120..5f4cd7e3 100644
--- a/src/timmy/tools_intro/__init__.py
+++ b/src/timmy/tools_intro/__init__.py
@@ -134,7 +134,7 @@ def get_memory_status() -> dict[str, Any]:
tier1_info: dict[str, Any] = {
"exists": tier1_exists,
"path": str(memory_md),
- "preview": tier1_content[:200] if tier1_content else None,
+ "preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
}
if tier1_exists:
lines = memory_md.read_text().splitlines()
diff --git a/tests/e2e/test_agentic_chain.py b/tests/e2e/test_agentic_chain.py
new file mode 100644
index 00000000..9057cc98
--- /dev/null
+++ b/tests/e2e/test_agentic_chain.py
@@ -0,0 +1,102 @@
+"""E2E: verify multi-step tool chaining works end-to-end.
+
+These tests validate the full agentic loop pipeline: planning,
+execution, adaptation, and progress tracking.
+"""
+
+import pytest
+from unittest.mock import MagicMock, patch, AsyncMock
+from timmy.agentic_loop import run_agentic_loop
+
+
+def _mock_run(content: str):
+ """Create a mock return value for agent.run()."""
+ m = MagicMock()
+ m.content = content
+ return m
+
+
+@pytest.mark.asyncio
+async def test_multistep_chain_completes_all_steps():
+ """GREEN PATH: multi-step prompt executes all steps."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Search AI news\n2. Write to file\n3. Verify"),
+ _mock_run("Found 5 articles about AI in March 2026."),
+ _mock_run("Wrote summary to /tmp/ai_news.md"),
+ _mock_run("File exists, 15 lines."),
+ _mock_run("Searched, wrote, verified."),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Search AI news and write summary to file")
+
+ assert result.status == "completed"
+ assert len(result.steps) == 3
+ assert mock_agent.run.call_count == 5 # plan + 3 steps + summary
+
+
+@pytest.mark.asyncio
+async def test_multistep_chain_adapts_on_failure():
+ """Step failure -> model adapts -> continues."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Read config\n2. Update setting\n3. Verify"),
+ _mock_run("Config: timeout=30"),
+ Exception("Permission denied"),
+ _mock_run("Adapted: wrote to ~/config.yaml instead"),
+ _mock_run("Verified: timeout=60"),
+ _mock_run("Updated config. Used ~/config.yaml due to permissions."),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Update config timeout to 60")
+
+ assert result.status == "completed"
+ assert any(s.status == "adapted" for s in result.steps)
+
+
+@pytest.mark.asyncio
+async def test_max_steps_enforced():
+ """Loop stops at max_steps."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. A\n2. B\n3. C\n4. D\n5. E"),
+ _mock_run("A done"),
+ _mock_run("B done"),
+ _mock_run("Completed 2 of 5 steps."),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Do 5 things", max_steps=2)
+
+ assert len(result.steps) == 2
+ assert result.status == "partial"
+
+
+@pytest.mark.asyncio
+async def test_progress_events_fire():
+ """Progress callback fires per step."""
+ events = []
+
+ async def on_progress(desc, step, total):
+ events.append((step, total))
+
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Do A\n2. Do B"),
+ _mock_run("A done"),
+ _mock_run("B done"),
+ _mock_run("All done"),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ await run_agentic_loop("Do A and B", on_progress=on_progress)
+
+ assert len(events) == 2
+ assert events[0] == (1, 2)
+ assert events[1] == (2, 2)
diff --git a/tests/test_agentic_loop.py b/tests/test_agentic_loop.py
new file mode 100644
index 00000000..1d5541b9
--- /dev/null
+++ b/tests/test_agentic_loop.py
@@ -0,0 +1,213 @@
+"""Unit tests for the agentic loop module.
+
+Tests cover planning, execution, max_steps enforcement, failure
+adaptation, progress callbacks, and response cleaning.
+"""
+
+import pytest
+from unittest.mock import MagicMock, patch, AsyncMock
+from timmy.agentic_loop import (
+ run_agentic_loop,
+ _parse_steps,
+ AgenticResult,
+ AgenticStep,
+)
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _mock_run(content: str):
+ """Create a mock return value for agent.run()."""
+ m = MagicMock()
+ m.content = content
+ return m
+
+
+# ---------------------------------------------------------------------------
+# _parse_steps
+# ---------------------------------------------------------------------------
+
+class TestParseSteps:
+ def test_numbered_with_dot(self):
+ text = "1. Search for data\n2. Write to file\n3. Verify"
+ assert _parse_steps(text) == ["Search for data", "Write to file", "Verify"]
+
+ def test_numbered_with_paren(self):
+ text = "1) Read config\n2) Update value\n3) Restart"
+ assert _parse_steps(text) == ["Read config", "Update value", "Restart"]
+
+ def test_fallback_plain_lines(self):
+ text = "Search the web\nWrite results\nDone"
+ assert _parse_steps(text) == ["Search the web", "Write results", "Done"]
+
+ def test_empty_returns_empty(self):
+ assert _parse_steps("") == []
+
+
+# ---------------------------------------------------------------------------
+# run_agentic_loop
+# ---------------------------------------------------------------------------
+
+@pytest.mark.asyncio
+async def test_planning_phase_produces_steps():
+ """Planning prompt returns numbered step list."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Search AI news\n2. Write to file\n3. Verify"),
+ _mock_run("Found 5 articles about AI."),
+ _mock_run("Wrote summary to /tmp/ai_news.md"),
+ _mock_run("File verified, 15 lines."),
+ _mock_run("Searched, wrote, verified."),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Search AI news and write summary")
+
+ assert result.status == "completed"
+ assert len(result.steps) == 3
+
+
+@pytest.mark.asyncio
+async def test_loop_executes_all_steps():
+ """Loop calls agent.run() for plan + each step + summary."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Do A\n2. Do B"),
+ _mock_run("A done"),
+ _mock_run("B done"),
+ _mock_run("All done"),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Do A and B")
+
+ # plan + 2 steps + summary = 4 calls
+ assert mock_agent.run.call_count == 4
+ assert len(result.steps) == 2
+
+
+@pytest.mark.asyncio
+async def test_loop_respects_max_steps():
+ """Loop stops at max_steps and returns status='partial'."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. A\n2. B\n3. C\n4. D\n5. E"),
+ _mock_run("A done"),
+ _mock_run("B done"),
+ _mock_run("Completed 2 of 5 steps."),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Do 5 things", max_steps=2)
+
+ assert len(result.steps) == 2
+ assert result.status == "partial"
+
+
+@pytest.mark.asyncio
+async def test_failure_triggers_adaptation():
+ """Failed step feeds error back to model, step marked as adapted."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Read config\n2. Update setting\n3. Verify"),
+ _mock_run("Config: timeout=30"),
+ Exception("Permission denied"),
+ _mock_run("Adapted: wrote to ~/config.yaml instead"),
+ _mock_run("Verified: timeout=60"),
+ _mock_run("Updated config via alternative path."),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Update config timeout to 60")
+
+ assert result.status == "completed"
+ assert any(s.status == "adapted" for s in result.steps)
+
+
+@pytest.mark.asyncio
+async def test_progress_callback_fires():
+ """on_progress called for each step completion."""
+ events = []
+
+ async def on_progress(desc, step, total):
+ events.append((step, total))
+
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Do A\n2. Do B"),
+ _mock_run("A done"),
+ _mock_run("B done"),
+ _mock_run("All done"),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ await run_agentic_loop("Do A and B", on_progress=on_progress)
+
+ assert len(events) == 2
+ assert events[0] == (1, 2)
+ assert events[1] == (2, 2)
+
+
+@pytest.mark.asyncio
+async def test_result_contains_step_metadata():
+ """AgenticResult.steps has status and duration per step."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=[
+ _mock_run("1. Search\n2. Write"),
+ _mock_run("Found results"),
+ _mock_run("Written to file"),
+ _mock_run("Done"),
+ ])
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Search and write")
+
+ for step in result.steps:
+ assert step.status in ("completed", "failed", "adapted")
+ assert step.duration_ms >= 0
+ assert step.description
+ assert step.result
+
+
+@pytest.mark.asyncio
+async def test_config_default_used():
+ """When max_steps=0, uses settings.max_agent_steps."""
+ mock_agent = MagicMock()
+ # Return more steps than default config allows (10)
+ steps_text = "\n".join(f"{i}. Step {i}" for i in range(1, 15))
+ side_effects = [_mock_run(steps_text)]
+ # 10 step results + summary
+ for i in range(1, 11):
+ side_effects.append(_mock_run(f"Step {i} done"))
+ side_effects.append(_mock_run("Summary"))
+
+ mock_agent.run = MagicMock(side_effect=side_effects)
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Do 14 things", max_steps=0)
+
+ # Should be capped at 10 (config default)
+ assert len(result.steps) == 10
+
+
+@pytest.mark.asyncio
+async def test_planning_failure_returns_failed():
+ """If the planning phase fails, result.status is 'failed'."""
+ mock_agent = MagicMock()
+ mock_agent.run = MagicMock(side_effect=Exception("Model offline"))
+
+ with patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), \
+ patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
+ result = await run_agentic_loop("Do something")
+
+ assert result.status == "failed"
+ assert "Planning failed" in result.summary
diff --git a/tests/test_smoke.py b/tests/test_smoke.py
new file mode 100644
index 00000000..1f2fed5c
--- /dev/null
+++ b/tests/test_smoke.py
@@ -0,0 +1,227 @@
+"""Smoke tests — verify every major page loads without uncaught exceptions.
+
+These tests catch regressions that unit tests miss: import errors,
+template rendering failures, database schema mismatches, and startup
+crashes. They run fast (no Ollama needed) and should stay green on
+every commit.
+"""
+
+import pytest
+from fastapi.testclient import TestClient
+
+
+@pytest.fixture
+def client():
+ from dashboard.app import app
+ with TestClient(app, raise_server_exceptions=False) as c:
+ yield c
+
+
+# ---------------------------------------------------------------------------
+# Core pages — these MUST return 200
+# ---------------------------------------------------------------------------
+
+class TestCorePages:
+ """Every core dashboard page loads without error."""
+
+ def test_index(self, client):
+ r = client.get("/")
+ assert r.status_code == 200
+
+ def test_health(self, client):
+ r = client.get("/health")
+ assert r.status_code == 200
+
+ def test_health_status(self, client):
+ r = client.get("/health/status")
+ assert r.status_code == 200
+
+ def test_agent_panel(self, client):
+ r = client.get("/agents/default/panel")
+ assert r.status_code == 200
+
+ def test_agent_history(self, client):
+ r = client.get("/agents/default/history")
+ assert r.status_code == 200
+
+
+# ---------------------------------------------------------------------------
+# Feature pages — should return 200 (or 307 redirect, never 500)
+# ---------------------------------------------------------------------------
+
+class TestFeaturePages:
+ """Feature pages load without 500 errors."""
+
+ def test_briefing(self, client):
+ r = client.get("/briefing")
+ assert r.status_code in (200, 307)
+
+ def test_thinking(self, client):
+ r = client.get("/thinking")
+ assert r.status_code == 200
+
+ def test_tools(self, client):
+ r = client.get("/tools")
+ assert r.status_code == 200
+
+ def test_memory(self, client):
+ r = client.get("/memory")
+ assert r.status_code == 200
+
+ def test_calm(self, client):
+ r = client.get("/calm")
+ assert r.status_code == 200
+
+ def test_tasks(self, client):
+ r = client.get("/tasks")
+ assert r.status_code == 200
+
+ def test_work_orders_queue(self, client):
+ r = client.get("/work-orders/queue")
+ assert r.status_code == 200
+
+ def test_mobile(self, client):
+ r = client.get("/mobile")
+ assert r.status_code == 200
+
+ def test_spark(self, client):
+ r = client.get("/spark")
+ assert r.status_code in (200, 307)
+
+ def test_models(self, client):
+ r = client.get("/models")
+ assert r.status_code == 200
+
+ def test_swarm_live(self, client):
+ r = client.get("/swarm/live")
+ assert r.status_code == 200
+
+ def test_swarm_events(self, client):
+ r = client.get("/swarm/events")
+ assert r.status_code == 200
+
+ def test_marketplace(self, client):
+ r = client.get("/marketplace")
+ assert r.status_code in (200, 307)
+
+
+# ---------------------------------------------------------------------------
+# JSON API endpoints — should return valid JSON, never 500
+# ---------------------------------------------------------------------------
+
+class TestAPIEndpoints:
+ """API endpoints return valid JSON without server errors."""
+
+ def test_health_json(self, client):
+ r = client.get("/health")
+ assert r.status_code == 200
+ data = r.json()
+ assert "status" in data
+
+ def test_health_components(self, client):
+ r = client.get("/health/components")
+ assert r.status_code == 200
+
+ def test_health_sovereignty(self, client):
+ r = client.get("/health/sovereignty")
+ assert r.status_code == 200
+
+ def test_queue_status(self, client):
+ r = client.get("/api/queue/status")
+ assert r.status_code == 200
+
+ def test_tasks_api(self, client):
+ r = client.get("/api/tasks")
+ assert r.status_code == 200
+
+ def test_chat_history(self, client):
+ r = client.get("/api/chat/history")
+ assert r.status_code == 200
+
+ def test_tools_stats(self, client):
+ r = client.get("/tools/api/stats")
+ assert r.status_code == 200
+
+ def test_thinking_api(self, client):
+ r = client.get("/thinking/api")
+ assert r.status_code == 200
+
+ def test_notifications_api(self, client):
+ r = client.get("/api/notifications")
+ assert r.status_code == 200
+
+ def test_providers_api(self, client):
+ r = client.get("/router/api/providers")
+ assert r.status_code == 200
+
+ def test_mobile_status(self, client):
+ r = client.get("/mobile/status")
+ assert r.status_code == 200
+
+ def test_discord_status(self, client):
+ r = client.get("/discord/status")
+ assert r.status_code == 200
+
+ def test_telegram_status(self, client):
+ r = client.get("/telegram/status")
+ assert r.status_code == 200
+
+ def test_grok_status(self, client):
+ r = client.get("/grok/status")
+ assert r.status_code == 200
+
+ def test_paperclip_status(self, client):
+ r = client.get("/api/paperclip/status")
+ assert r.status_code == 200
+
+
+# ---------------------------------------------------------------------------
+# No 500s — every GET route should survive without server error
+# ---------------------------------------------------------------------------
+
+class TestNo500:
+ """Verify that no page returns a 500 Internal Server Error."""
+
+ @pytest.mark.parametrize("path", [
+ "/",
+ "/health",
+ "/health/status",
+ "/health/sovereignty",
+ "/health/components",
+ "/agents/default/panel",
+ "/agents/default/history",
+ "/briefing",
+ "/thinking",
+ "/thinking/api",
+ "/tools",
+ "/tools/api/stats",
+ "/memory",
+ "/calm",
+ "/tasks",
+ "/tasks/pending",
+ "/tasks/active",
+ "/tasks/completed",
+ "/work-orders/queue",
+ "/work-orders/queue/pending",
+ "/work-orders/queue/active",
+ "/mobile",
+ "/mobile/status",
+ "/spark",
+ "/models",
+ "/swarm/live",
+ "/swarm/events",
+ "/marketplace",
+ "/api/queue/status",
+ "/api/tasks",
+ "/api/chat/history",
+ "/api/notifications",
+ "/router/api/providers",
+ "/discord/status",
+ "/telegram/status",
+ "/grok/status",
+ "/grok/stats",
+ "/api/paperclip/status",
+ ])
+ def test_no_500(self, client, path):
+ r = client.get(path)
+ assert r.status_code != 500, f"GET {path} returned 500"
diff --git a/tests/timmy/test_grok_backend.py b/tests/timmy/test_grok_backend.py
index 688ded4a..693e049e 100644
--- a/tests/timmy/test_grok_backend.py
+++ b/tests/timmy/test_grok_backend.py
@@ -249,14 +249,14 @@ def test_consult_grok_calls_backend_when_available():
# ── Grok dashboard route tests ─────────────────────────────────────────────
def test_grok_status_endpoint(client):
- """GET /grok/status returns JSON with Grok configuration."""
+ """GET /grok/status returns HTML dashboard page."""
response = client.get("/grok/status")
assert response.status_code == 200
- data = response.json()
- assert "enabled" in data
- assert "available" in data
- assert "model" in data
- assert "api_key_set" in data
+ assert "text/html" in response.headers.get("content-type", "")
+ # Verify key status info is present in the rendered HTML
+ text = response.text
+ assert "Grok Status" in text
+ assert "Status" in text
def test_grok_toggle_returns_html(client):