1
0

Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander Whitestone
30376d1bb4 feat: Agent Dreaming Mode — idle background reflection loop (#1019)
- Add src/timmy/dreaming.py with DreamingEngine: rule synthesis from
  session logs during idle periods with configurable wake/sleep thresholds
- Add src/dashboard/routes/dreaming.py with REST endpoints for
  dreaming status, history, and manual trigger
- Add dreaming_status.html partial template for HTMX polling
- Wire dreaming_router into dashboard app
- Add dreaming CSS styles to mission-control.css
- Add dreaming_enabled/dreaming_idle_threshold_s config settings
- 17 unit tests all passing

Fixes #1019

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 18:56:21 -04:00
d697c3d93e [claude] refactor: break up monolithic tools.py into a tools/ package (#1215) (#1221) 2026-03-23 22:43:09 +00:00
12 changed files with 1577 additions and 531 deletions

View File

@@ -300,6 +300,14 @@ class Settings(BaseSettings):
thinking_memory_check_every: int = 50 # check memory status every Nth thought
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
# ── Dreaming Mode ─────────────────────────────────────────────────
# When enabled, the agent replays past sessions during idle time to
# simulate alternative actions and propose behavioural rules.
dreaming_enabled: bool = True
dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
dreaming_cycle_seconds: int = 600 # seconds between dream attempts
dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# These values are passed as env vars to the gitea-mcp server process.

View File

@@ -57,6 +57,7 @@ from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.dreaming import router as dreaming_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import matrix_router
from dashboard.routes.world import router as world_router
@@ -249,6 +250,36 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
async def _dreaming_scheduler() -> None:
"""Background task: run idle-time dreaming cycles.
When the system has been idle for ``dreaming_idle_threshold_minutes``,
the dreaming engine replays a past session and simulates alternatives.
"""
from timmy.dreaming import dreaming_engine
await asyncio.sleep(15) # Stagger after loop QA scheduler
while True:
try:
if settings.dreaming_enabled:
await asyncio.wait_for(
dreaming_engine.dream_once(),
timeout=settings.dreaming_timeout_seconds + 10,
)
except TimeoutError:
logger.warning(
"Dreaming cycle timed out after %ds",
settings.dreaming_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Dreaming scheduler error: %s", exc)
await asyncio.sleep(settings.dreaming_cycle_seconds)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -409,6 +440,7 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_dreaming_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
@@ -676,6 +708,7 @@ app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
app.include_router(dreaming_router)
@app.websocket("/ws")

View File

@@ -0,0 +1,84 @@
"""Dreaming mode dashboard routes.
GET /dreaming/api/status — JSON status of the dreaming engine
GET /dreaming/api/recent — JSON list of recent dream records
POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
GET /dreaming/partial — HTMX partial: dreaming status panel
"""
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.templating import templates
from timmy.dreaming import dreaming_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dreaming", tags=["dreaming"])
@router.get("/api/status", response_class=JSONResponse)
async def dreaming_status():
"""Return current dreaming engine status as JSON."""
return dreaming_engine.get_status()
@router.get("/api/recent", response_class=JSONResponse)
async def dreaming_recent(limit: int = 10):
"""Return recent dream records as JSON."""
dreams = dreaming_engine.get_recent_dreams(limit=limit)
return [
{
"id": d.id,
"session_excerpt": d.session_excerpt[:200],
"decision_point": d.decision_point[:200],
"simulation": d.simulation,
"proposed_rule": d.proposed_rule,
"created_at": d.created_at,
}
for d in dreams
]
@router.post("/api/trigger", response_class=JSONResponse)
async def dreaming_trigger():
"""Manually trigger a dream cycle (bypasses idle check).
Useful for testing and manual inspection. Forces idle state temporarily.
"""
from datetime import UTC, datetime, timedelta
from config import settings
# Temporarily back-date last activity to appear idle
original_time = dreaming_engine._last_activity_time
dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
minutes=settings.dreaming_idle_threshold_minutes + 1
)
try:
dream = await dreaming_engine.dream_once()
finally:
dreaming_engine._last_activity_time = original_time
if dream:
return {
"status": "ok",
"dream_id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
}
return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
@router.get("/partial", response_class=HTMLResponse)
async def dreaming_partial(request: Request):
"""HTMX partial: dreaming status panel for the dashboard."""
status = dreaming_engine.get_status()
recent = dreaming_engine.get_recent_dreams(limit=5)
return templates.TemplateResponse(
request,
"partials/dreaming_status.html",
{"status": status, "recent_dreams": recent},
)

View File

@@ -0,0 +1,32 @@
{% if not status.enabled %}
<div class="dream-disabled text-muted small">Dreaming mode disabled</div>
{% elif status.dreaming %}
<div class="dream-active">
<span class="dream-pulse"></span>
<span class="dream-label">DREAMING</span>
<div class="dream-summary">{{ status.current_summary }}</div>
</div>
{% elif status.idle %}
<div class="dream-idle">
<span class="dream-dot dream-dot-idle"></span>
<span class="dream-label-idle">IDLE</span>
<span class="dream-idle-meta">{{ status.idle_minutes }}m — dream cycle pending</span>
</div>
{% else %}
<div class="dream-standby">
<span class="dream-dot dream-dot-standby"></span>
<span class="dream-label-standby">STANDBY</span>
<span class="dream-idle-meta">idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m</span>
</div>
{% endif %}
{% if recent_dreams %}
<div class="dream-history mt-2">
{% for d in recent_dreams %}
<div class="dream-record">
<div class="dream-rule">{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}</div>
<div class="dream-meta">{{ d.created_at[:16] | replace("T", " ") }}</div>
</div>
{% endfor %}
</div>
{% endif %}

435
src/timmy/dreaming.py Normal file
View File

@@ -0,0 +1,435 @@
"""Dreaming Mode — idle-time session replay and counterfactual simulation.
When the dashboard has been idle for a configurable period, this engine
selects a past chat session, identifies key agent response points, and
asks the LLM to simulate alternative approaches. Insights are stored as
proposed rules that can feed the auto-crystallizer or memory system.
Usage::
from timmy.dreaming import dreaming_engine
# Run one dream cycle (called by the background scheduler)
await dreaming_engine.dream_once()
# Query recent dreams
dreams = dreaming_engine.get_recent_dreams(limit=10)
# Get current status dict for API/dashboard
status = dreaming_engine.get_status()
"""
import json
import logging
import re
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
_DEFAULT_DB = Path("data/dreams.db")
# Strip <think> tags from reasoning model output
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Minimum messages in a session to be worth replaying
_MIN_SESSION_MESSAGES = 3
# Gap in seconds between messages that signals a new session
_SESSION_GAP_SECONDS = 1800 # 30 minutes
@dataclass
class DreamRecord:
"""A single completed dream cycle."""
id: str
session_excerpt: str # Short excerpt from the replayed session
decision_point: str # The agent message that was re-simulated
simulation: str # The alternative response generated
proposed_rule: str # Rule extracted from the simulation
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS dreams (
id TEXT PRIMARY KEY,
session_excerpt TEXT NOT NULL,
decision_point TEXT NOT NULL,
simulation TEXT NOT NULL,
proposed_rule TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
conn.commit()
yield conn
def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
return DreamRecord(
id=row["id"],
session_excerpt=row["session_excerpt"],
decision_point=row["decision_point"],
simulation=row["simulation"],
proposed_rule=row["proposed_rule"],
created_at=row["created_at"],
)
class DreamingEngine:
"""Idle-time dreaming engine — replays sessions and simulates alternatives."""
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_activity_time: datetime = datetime.now(UTC)
self._is_dreaming: bool = False
self._current_dream_summary: str = ""
self._dreaming_agent = None # Lazy-initialised
# ── Public API ────────────────────────────────────────────────────────
def record_activity(self) -> None:
"""Reset the idle timer — call this on every user/agent interaction."""
self._last_activity_time = datetime.now(UTC)
def is_idle(self) -> bool:
"""Return True if the system has been idle long enough to start dreaming."""
threshold = settings.dreaming_idle_threshold_minutes
if threshold <= 0:
return False
return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
def get_status(self) -> dict[str, Any]:
"""Return a status dict suitable for API/dashboard consumption."""
return {
"enabled": settings.dreaming_enabled,
"dreaming": self._is_dreaming,
"idle": self.is_idle(),
"current_summary": self._current_dream_summary,
"idle_minutes": int(
(datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
),
"idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
"dream_count": self.count_dreams(),
}
async def dream_once(self) -> DreamRecord | None:
"""Execute one dream cycle.
Returns the stored DreamRecord, or None if the cycle was skipped
(not idle, dreaming disabled, no suitable session, or LLM error).
"""
if not settings.dreaming_enabled:
return None
if not self.is_idle():
logger.debug(
"Dreaming skipped — system active (idle for %d min, threshold %d min)",
int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
settings.dreaming_idle_threshold_minutes,
)
return None
if self._is_dreaming:
logger.debug("Dreaming skipped — cycle already in progress")
return None
self._is_dreaming = True
self._current_dream_summary = "Selecting a past session…"
await self._broadcast_status()
try:
return await self._run_dream_cycle()
except Exception as exc:
logger.warning("Dream cycle failed: %s", exc)
return None
finally:
self._is_dreaming = False
self._current_dream_summary = ""
await self._broadcast_status()
def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
"""Retrieve the most recent dream records."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_dream(r) for r in rows]
def count_dreams(self) -> int:
"""Return total number of stored dream records."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
return row["c"] if row else 0
# ── Private helpers ───────────────────────────────────────────────────
async def _run_dream_cycle(self) -> DreamRecord | None:
"""Core dream logic: select → simulate → store."""
# 1. Select a past session from the chat log
session = await self._select_session()
if not session:
logger.debug("No suitable chat session found for dreaming")
self._current_dream_summary = "No past sessions to replay"
return None
decision_point, session_excerpt = session
self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}"
await self._broadcast_status()
# 2. Simulate an alternative response
simulation = await self._simulate_alternative(decision_point, session_excerpt)
if not simulation:
logger.debug("Dream simulation produced no output")
return None
# 3. Extract a proposed rule
proposed_rule = await self._extract_rule(decision_point, simulation)
# 4. Store and broadcast
dream = self._store_dream(
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
)
self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
logger.info(
"Dream [%s]: replayed session, proposed rule: %s",
dream.id[:8],
proposed_rule[:80] if proposed_rule else "(none)",
)
await self._broadcast_status()
await self._broadcast_dream(dream)
return dream
async def _select_session(self) -> tuple[str, str] | None:
"""Select a past chat session and return (decision_point, session_excerpt).
Uses the SQLite chat store. Groups messages into sessions by time
gap. Picks a random session with enough messages, then selects one
agent response as the decision point.
"""
try:
from infrastructure.chat_store import DB_PATH
if not DB_PATH.exists():
return None
import asyncio
rows = await asyncio.to_thread(self._load_chat_rows)
if not rows:
return None
sessions = self._group_into_sessions(rows)
if not sessions:
return None
# Filter sessions with enough messages
valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
if not valid:
return None
import random
session = random.choice(valid) # noqa: S311 (not cryptographic)
# Build a short text excerpt (last N messages)
excerpt_msgs = session[-6:]
excerpt = "\n".join(
f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
)
# Find agent responses as candidate decision points
agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
if not agent_msgs:
return None
decision = random.choice(agent_msgs) # noqa: S311
return decision["content"], excerpt
except Exception as exc:
logger.warning("Session selection failed: %s", exc)
return None
def _load_chat_rows(self) -> list[dict]:
"""Synchronously load chat messages from SQLite."""
from infrastructure.chat_store import DB_PATH
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT role, content, timestamp FROM chat_messages "
"ORDER BY timestamp ASC"
).fetchall()
return [dict(r) for r in rows]
def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
"""Group chat rows into sessions based on time gaps."""
if not rows:
return []
sessions: list[list[dict]] = []
current: list[dict] = [rows[0]]
for prev, curr in zip(rows, rows[1:]):
try:
t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
gap = (t_curr - t_prev).total_seconds()
except Exception:
gap = 0
if gap > _SESSION_GAP_SECONDS:
sessions.append(current)
current = [curr]
else:
current.append(curr)
sessions.append(current)
return sessions
async def _simulate_alternative(
self, decision_point: str, session_excerpt: str
) -> str:
"""Ask the LLM to simulate an alternative response."""
prompt = (
"You are Timmy, a sovereign AI agent in a dreaming state.\n"
"You are replaying a past conversation and exploring what you could "
"have done differently at a key decision point.\n\n"
"PAST SESSION EXCERPT:\n"
f"{session_excerpt}\n\n"
"KEY DECISION POINT (your past response):\n"
f"{decision_point[:500]}\n\n"
"TASK: In 2-3 sentences, describe ONE concrete alternative approach "
"you could have taken at this decision point that would have been "
"more helpful, more accurate, or more efficient.\n"
"Be specific — reference the actual content of the conversation.\n"
"Do NOT include meta-commentary about dreaming or this exercise.\n\n"
"Alternative approach:"
)
raw = await self._call_agent(prompt)
return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
async def _extract_rule(self, decision_point: str, simulation: str) -> str:
"""Extract a proposed behaviour rule from the simulation."""
prompt = (
"Given this pair of agent responses:\n\n"
f"ORIGINAL: {decision_point[:300]}\n\n"
f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
"Extract ONE concise rule (max 20 words) that captures what to do "
"differently next time. Format: 'When X, do Y instead of Z.'\n"
"Rule:"
)
raw = await self._call_agent(prompt)
rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
# Keep only the first sentence/line
rule = rule.split("\n")[0].strip().rstrip(".")
return rule[:200] # Safety cap
async def _call_agent(self, prompt: str) -> str:
"""Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
import asyncio
if self._dreaming_agent is None:
from timmy.agent import create_timmy
self._dreaming_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(settings.dreaming_timeout_seconds):
run = await self._dreaming_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
return ""
except Exception as exc:
logger.warning("Dreaming LLM call failed: %s", exc)
return ""
raw = run.content if hasattr(run, "content") else str(run)
return raw or ""
def _store_dream(
self,
*,
session_excerpt: str,
decision_point: str,
simulation: str,
proposed_rule: str,
) -> DreamRecord:
dream = DreamRecord(
id=str(uuid.uuid4()),
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
created_at=datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO dreams
(id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
dream.id,
dream.session_excerpt,
dream.decision_point,
dream.simulation,
dream.proposed_rule,
dream.created_at,
),
)
conn.commit()
return dream
async def _broadcast_status(self) -> None:
"""Push current dreaming status via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("dreaming_state", self.get_status())
except Exception as exc:
logger.debug("Dreaming status broadcast failed: %s", exc)
async def _broadcast_dream(self, dream: DreamRecord) -> None:
"""Push a completed dream record via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"dreaming_complete",
{
"id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
"created_at": dream.created_at,
},
)
except Exception as exc:
logger.debug("Dreaming complete broadcast failed: %s", exc)
# Module-level singleton
dreaming_engine = DreamingEngine()

View File

@@ -0,0 +1,94 @@
"""Tool integration for the agent swarm.
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
Sub-modules:
- _base: shared types, tracking state
- file_tools: file-operation toolkit factories (Echo, Quill, Seer)
- system_tools: calculator, AI tools, code/devops toolkit factories
- _registry: full toolkit construction, agent registry, tool catalog
"""
# Re-export everything for backward compatibility — callers that do
# ``from timmy.tools import <symbol>`` continue to work unchanged.
from timmy.tools._base import (
AgentTools,
PersonaTools,
ToolStats,
_AGNO_TOOLS_AVAILABLE,
_ImportError,
_TOOL_USAGE,
_track_tool_usage,
get_tool_stats,
)
from timmy.tools._registry import (
AGENT_TOOLKITS,
PERSONA_TOOLKITS,
_create_stub_toolkit,
_merge_catalog,
create_experiment_tools,
create_full_toolkit,
get_all_available_tools,
get_tools_for_agent,
get_tools_for_persona,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
create_data_tools,
create_research_tools,
create_writing_tools,
)
from timmy.tools.system_tools import (
_safe_eval,
calculator,
consult_grok,
create_aider_tool,
create_code_tools,
create_devops_tools,
create_security_tools,
web_fetch,
)
__all__ = [
# _base
"AgentTools",
"PersonaTools",
"ToolStats",
"_AGNO_TOOLS_AVAILABLE",
"_ImportError",
"_TOOL_USAGE",
"_track_tool_usage",
"get_tool_stats",
# file_tools
"_make_smart_read_file",
"create_data_tools",
"create_research_tools",
"create_writing_tools",
# system_tools
"_safe_eval",
"calculator",
"consult_grok",
"create_aider_tool",
"create_code_tools",
"create_devops_tools",
"create_security_tools",
"web_fetch",
# _registry
"AGENT_TOOLKITS",
"PERSONA_TOOLKITS",
"_create_stub_toolkit",
"_merge_catalog",
"create_experiment_tools",
"create_full_toolkit",
"get_all_available_tools",
"get_tools_for_agent",
"get_tools_for_persona",
]

90
src/timmy/tools/_base.py Normal file
View File

@@ -0,0 +1,90 @@
"""Base types, shared state, and tracking for the Timmy tool system."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: "Toolkit"
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats

View File

@@ -1,532 +1,48 @@
"""Tool integration for the agent swarm.
"""Tool registry, full toolkit construction, and tool catalog.
Provides agents with capabilities for:
- File read/write (local filesystem)
- Shell command execution (sandboxed)
- Python code execution
- Git operations
- Image / Music / Video generation (creative pipeline)
Tools are assigned to agents based on their specialties.
Provides:
- Internal _register_* helpers for wiring tools into toolkits
- create_full_toolkit (orchestrator toolkit)
- create_experiment_tools (Lab agent toolkit)
- AGENT_TOOLKITS / get_tools_for_agent registry
- get_all_available_tools catalog
"""
from __future__ import annotations
import ast
import logging
import math
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from config import settings
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
)
from timmy.tools.file_tools import (
_make_smart_read_file,
create_data_tools,
create_research_tools,
create_writing_tools,
)
from timmy.tools.system_tools import (
calculator,
consult_grok,
create_code_tools,
create_devops_tools,
create_security_tools,
web_fetch,
)
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
# Lazy imports to handle test mocking
_ImportError = None
try:
from agno.tools import Toolkit
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.tools.shell import ShellTools
_AGNO_TOOLS_AVAILABLE = True
except ImportError as e:
_AGNO_TOOLS_AVAILABLE = False
_ImportError = e
# Track tool usage stats
_TOOL_USAGE: dict[str, list[dict]] = {}
@dataclass
class ToolStats:
"""Statistics for a single tool."""
tool_name: str
call_count: int = 0
last_used: str | None = None
errors: int = 0
@dataclass
class AgentTools:
"""Tools assigned to an agent."""
agent_id: str
agent_name: str
toolkit: Toolkit
available_tools: list[str] = field(default_factory=list)
# Backward-compat alias
PersonaTools = AgentTools
def _track_tool_usage(agent_id: str, tool_name: str, success: bool = True) -> None:
"""Track tool usage for analytics."""
if agent_id not in _TOOL_USAGE:
_TOOL_USAGE[agent_id] = []
_TOOL_USAGE[agent_id].append(
{
"tool": tool_name,
"timestamp": datetime.now(UTC).isoformat(),
"success": success,
}
)
def get_tool_stats(agent_id: str | None = None) -> dict:
"""Get tool usage statistics.
Args:
agent_id: Optional agent ID to filter by. If None, returns stats for all agents.
Returns:
Dict with tool usage statistics.
"""
if agent_id:
usage = _TOOL_USAGE.get(agent_id, [])
return {
"agent_id": agent_id,
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
"recent_calls": usage[-10:] if usage else [],
}
# Return stats for all agents
all_stats = {}
for aid, usage in _TOOL_USAGE.items():
all_stats[aid] = {
"total_calls": len(usage),
"tools_used": list(set(u["tool"] for u in usage)),
}
return all_stats
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def _make_smart_read_file(file_tools: FileTools) -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
import subprocess
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
# ---------------------------------------------------------------------------
# Internal _register_* helpers
# ---------------------------------------------------------------------------
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
@@ -717,6 +233,11 @@ def _register_thinking_tools(toolkit: Toolkit) -> None:
raise
# ---------------------------------------------------------------------------
# Full toolkit factories
# ---------------------------------------------------------------------------
def create_full_toolkit(base_dir: str | Path | None = None):
"""Create a full toolkit with all available tools (for the orchestrator).
@@ -727,6 +248,7 @@ def create_full_toolkit(base_dir: str | Path | None = None):
# Return None when tools aren't available (tests)
return None
from config import settings
from timmy.tool_safety import DANGEROUS_TOOLS
toolkit = Toolkit(name="full")
@@ -808,19 +330,9 @@ def create_experiment_tools(base_dir: str | Path | None = None):
return toolkit
# Mapping of agent IDs to their toolkits
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
"echo": create_research_tools,
"mace": create_security_tools,
"helm": create_devops_tools,
"seer": create_data_tools,
"forge": create_code_tools,
"quill": create_writing_tools,
"lab": create_experiment_tools,
"pixel": lambda base_dir=None: _create_stub_toolkit("pixel"),
"lyra": lambda base_dir=None: _create_stub_toolkit("lyra"),
"reel": lambda base_dir=None: _create_stub_toolkit("reel"),
}
# ---------------------------------------------------------------------------
# Agent toolkit registry
# ---------------------------------------------------------------------------
def _create_stub_toolkit(name: str):
@@ -836,7 +348,22 @@ def _create_stub_toolkit(name: str):
return toolkit
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> Toolkit | None:
# Mapping of agent IDs to their toolkits
AGENT_TOOLKITS: dict[str, Callable[[], Toolkit]] = {
"echo": create_research_tools,
"mace": create_security_tools,
"helm": create_devops_tools,
"seer": create_data_tools,
"forge": create_code_tools,
"quill": create_writing_tools,
"lab": create_experiment_tools,
"pixel": lambda base_dir=None: _create_stub_toolkit("pixel"),
"lyra": lambda base_dir=None: _create_stub_toolkit("lyra"),
"reel": lambda base_dir=None: _create_stub_toolkit("reel"),
}
def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> "Toolkit | None":
"""Get the appropriate toolkit for an agent.
Args:
@@ -852,11 +379,16 @@ def get_tools_for_agent(agent_id: str, base_dir: str | Path | None = None) -> To
return None
# Backward-compat alias
# Backward-compat aliases
get_tools_for_persona = get_tools_for_agent
PERSONA_TOOLKITS = AGENT_TOOLKITS
# ---------------------------------------------------------------------------
# Tool catalog
# ---------------------------------------------------------------------------
def _core_tool_catalog() -> dict:
"""Return core file and execution tools catalog entries."""
return {

View File

@@ -0,0 +1,121 @@
"""File operation tools and agent toolkit factories for file-heavy agents.
Provides:
- Smart read_file wrapper (auto-lists directories)
- Toolkit factories for Echo (research), Quill (writing), Seer (data)
"""
from __future__ import annotations
import logging
from collections.abc import Callable
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
Toolkit,
)
logger = logging.getLogger(__name__)
def _make_smart_read_file(file_tools: "FileTools") -> Callable:
"""Wrap FileTools.read_file so directories auto-list their contents.
When the user (or the LLM) passes a directory path to read_file,
the raw Agno implementation throws an IsADirectoryError. This
wrapper detects that case, lists the directory entries, and returns
a helpful message so the model can pick the right file on its own.
"""
original_read = file_tools.read_file
def smart_read_file(file_name: str = "", encoding: str = "utf-8", **kwargs) -> str:
"""Reads the contents of the file `file_name` and returns the contents if successful."""
# LLMs often call read_file(path=...) instead of read_file(file_name=...)
if not file_name:
file_name = kwargs.get("path", "")
if not file_name:
return "Error: no file_name or path provided."
# Resolve the path the same way FileTools does
_safe, resolved = file_tools.check_escape(file_name)
if _safe and resolved.is_dir():
entries = sorted(p.name for p in resolved.iterdir() if not p.name.startswith("."))
listing = "\n".join(f" - {e}" for e in entries) if entries else " (empty directory)"
return (
f"'{file_name}' is a directory, not a file. "
f"Files inside:\n{listing}\n\n"
"Please call read_file with one of the files listed above."
)
return original_read(file_name, encoding=encoding)
# Preserve the original docstring for Agno tool schema generation
smart_read_file.__doc__ = original_read.__doc__
return smart_read_file
def create_research_tools(base_dir: str | Path | None = None):
"""Create tools for the research agent (Echo).
Includes: file reading
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="research")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_writing_tools(base_dir: str | Path | None = None):
"""Create tools for the writing agent (Quill).
Includes: file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="writing")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_data_tools(base_dir: str | Path | None = None):
"""Create tools for the data agent (Seer).
Includes: python execution, file reading, web search for data sources
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="data")
# Python execution for analysis
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File reading
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit

View File

@@ -0,0 +1,357 @@
"""System, calculation, and AI consultation tools for Timmy agents.
Provides:
- Safe AST-based calculator
- consult_grok (xAI frontier reasoning)
- web_fetch (content extraction)
- Toolkit factories for Forge (code), Mace (security), Helm (devops)
"""
from __future__ import annotations
import ast
import logging
import math
import subprocess
from pathlib import Path
from timmy.tools._base import (
_AGNO_TOOLS_AVAILABLE,
_ImportError,
FileTools,
PythonTools,
ShellTools,
Toolkit,
)
from timmy.tools.file_tools import _make_smart_read_file
logger = logging.getLogger(__name__)
# Max characters of user query included in Lightning invoice memo
_INVOICE_MEMO_MAX_LEN = 50
def _safe_eval(node, allowed_names: dict):
"""Walk an AST and evaluate only safe numeric operations."""
if isinstance(node, ast.Expression):
return _safe_eval(node.body, allowed_names)
if isinstance(node, ast.Constant):
if isinstance(node.value, (int, float, complex)):
return node.value
raise ValueError(f"Unsupported constant: {node.value!r}")
if isinstance(node, ast.UnaryOp):
operand = _safe_eval(node.operand, allowed_names)
if isinstance(node.op, ast.UAdd):
return +operand
if isinstance(node.op, ast.USub):
return -operand
raise ValueError(f"Unsupported unary op: {type(node.op).__name__}")
if isinstance(node, ast.BinOp):
left = _safe_eval(node.left, allowed_names)
right = _safe_eval(node.right, allowed_names)
ops = {
ast.Add: lambda a, b: a + b,
ast.Sub: lambda a, b: a - b,
ast.Mult: lambda a, b: a * b,
ast.Div: lambda a, b: a / b,
ast.FloorDiv: lambda a, b: a // b,
ast.Mod: lambda a, b: a % b,
ast.Pow: lambda a, b: a**b,
}
op_fn = ops.get(type(node.op))
if op_fn is None:
raise ValueError(f"Unsupported binary op: {type(node.op).__name__}")
return op_fn(left, right)
if isinstance(node, ast.Name):
if node.id in allowed_names:
return allowed_names[node.id]
raise ValueError(f"Unknown name: {node.id!r}")
if isinstance(node, ast.Attribute):
value = _safe_eval(node.value, allowed_names)
# Only allow attribute access on the math module
if value is math:
attr = getattr(math, node.attr, None)
if attr is not None:
return attr
raise ValueError(f"Attribute access not allowed: .{node.attr}")
if isinstance(node, ast.Call):
func = _safe_eval(node.func, allowed_names)
if not callable(func):
raise ValueError(f"Not callable: {func!r}")
args = [_safe_eval(a, allowed_names) for a in node.args]
kwargs = {kw.arg: _safe_eval(kw.value, allowed_names) for kw in node.keywords}
return func(*args, **kwargs)
raise ValueError(f"Unsupported syntax: {type(node).__name__}")
def calculator(expression: str) -> str:
"""Evaluate a mathematical expression and return the exact result.
Use this tool for ANY arithmetic: multiplication, division, square roots,
exponents, percentages, logarithms, trigonometry, etc.
Args:
expression: A valid Python math expression, e.g. '347 * 829',
'math.sqrt(17161)', '2**10', 'math.log(100, 10)'.
Returns:
The exact result as a string.
"""
allowed_names = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
allowed_names["math"] = math
allowed_names["abs"] = abs
allowed_names["round"] = round
allowed_names["min"] = min
allowed_names["max"] = max
try:
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree, allowed_names)
return str(result)
except Exception as e: # broad catch intentional: arbitrary code execution
return f"Error evaluating '{expression}': {e}"
def consult_grok(query: str) -> str:
"""Consult Grok (xAI) for frontier reasoning on complex questions.
Use this tool when a question requires advanced reasoning, real-time
knowledge, or capabilities beyond the local model. Grok is a premium
cloud backend — use sparingly and only for high-complexity queries.
Args:
query: The question or reasoning task to send to Grok.
Returns:
Grok's response text, or an error/status message.
"""
from config import settings
from timmy.backends import get_grok_backend, grok_available
if not grok_available():
return (
"Grok is not available. Enable with GROK_ENABLED=true "
"and set XAI_API_KEY in your .env file."
)
backend = get_grok_backend()
# Log to Spark if available
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="default",
tool_name="consult_grok",
success=True,
)
except (ImportError, AttributeError) as exc:
logger.warning("Tool execution failed (consult_grok logging): %s", exc)
# Generate Lightning invoice for monetization (unless free mode)
invoice_info = ""
if not settings.grok_free:
try:
from lightning.factory import get_backend as get_ln_backend
ln = get_ln_backend()
sats = min(settings.grok_max_sats_per_query, settings.grok_sats_hard_cap)
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
result = backend.run(query)
response = result.content
if invoice_info:
response += invoice_info
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def create_aider_tool(base_path: Path):
"""Create an Aider tool for AI-assisted coding."""
class AiderTool:
"""Tool that calls Aider (local AI coding assistant) for code generation."""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
def run_aider(self, prompt: str, model: str = "qwen3:30b") -> str:
"""Run Aider to generate code changes.
Args:
prompt: What you want Aider to do (e.g., "add a fibonacci function")
model: Ollama model to use (default: qwen3:30b)
Returns:
Aider's response with the code changes made
"""
try:
# Run aider with the prompt
result = subprocess.run(
[
"aider",
"--no-git",
"--model",
f"ollama/{model}",
"--quiet",
prompt,
],
capture_output=True,
text=True,
timeout=120,
cwd=str(self.base_dir),
)
if result.returncode == 0:
return result.stdout if result.stdout else "Code changes applied successfully"
else:
return f"Aider error: {result.stderr}"
except FileNotFoundError:
return "Error: Aider not installed. Run: pip install aider"
except subprocess.TimeoutExpired:
return "Error: Aider timed out after 120 seconds"
except (OSError, subprocess.SubprocessError) as e:
return f"Error running Aider: {str(e)}"
return AiderTool(base_path)
def create_code_tools(base_dir: str | Path | None = None):
"""Create tools for the code agent (Forge).
Includes: shell commands, python execution, file read/write, Aider AI assist
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="code")
# Shell commands (sandboxed)
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
# File operations
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
# Aider AI coding assistant (local with Ollama)
aider_tool = create_aider_tool(base_path)
toolkit.register(aider_tool.run_aider, name="aider")
return toolkit
def create_security_tools(base_dir: str | Path | None = None):
"""Create tools for the security agent (Mace).
Includes: shell commands (for scanning), file read
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="security")
# Shell for running security scans
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File reading for logs/configs
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit
def create_devops_tools(base_dir: str | Path | None = None):
"""Create tools for the DevOps agent (Helm).
Includes: shell commands, file read/write
"""
if not _AGNO_TOOLS_AVAILABLE:
raise ImportError(f"Agno tools not available: {_ImportError}")
toolkit = Toolkit(name="devops")
# Shell for deployment commands
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
# File operations for config management
from config import settings
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
toolkit.register(file_tools.list_files, name="list_files")
return toolkit

View File

@@ -2549,6 +2549,7 @@
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
/* ── Voice settings ───────────────────────────────────────── */
.voice-settings-page { max-width: 600px; margin: 0 auto; }
@@ -2714,3 +2715,45 @@
padding: 0.3rem 0.6rem;
margin-bottom: 0.5rem;
}
/* ═══════════════════════════════════════════════════════════════
Dreaming Mode
═══════════════════════════════════════════════════════════════ */
.dream-active {
display: flex; align-items: center; gap: 8px;
padding: 6px 0;
}
.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
.dream-pulse {
display: inline-block; width: 8px; height: 8px; border-radius: 50%;
background: var(--purple);
animation: dream-pulse 1.8s ease-in-out infinite;
}
@keyframes dream-pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.4; transform: scale(0.7); }
}
.dream-dot {
display: inline-block; width: 7px; height: 7px; border-radius: 50%;
}
.dream-dot-idle { background: var(--amber); }
.dream-dot-standby { background: var(--text-dim); }
.dream-idle, .dream-standby {
display: flex; align-items: center; gap: 6px; padding: 4px 0;
}
.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
.dream-record:last-child { border-bottom: none; }
.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }

217
tests/unit/test_dreaming.py Normal file
View File

@@ -0,0 +1,217 @@
"""Unit tests for the Dreaming mode engine."""
import sqlite3
from contextlib import closing
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.dreaming import DreamingEngine, DreamRecord, _SESSION_GAP_SECONDS
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def tmp_dreams_db(tmp_path):
"""Return a temporary path for the dreams database."""
return tmp_path / "dreams.db"
@pytest.fixture()
def engine(tmp_dreams_db):
"""DreamingEngine backed by a temp database."""
return DreamingEngine(db_path=tmp_dreams_db)
@pytest.fixture()
def chat_db(tmp_path):
"""Create a minimal chat database with some messages."""
db_path = tmp_path / "chat.db"
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute("""
CREATE TABLE chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'browser'
)
""")
now = datetime.now(UTC)
messages = [
("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
]
conn.executemany(
"INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
messages,
)
conn.commit()
return db_path
# ── Idle detection ─────────────────────────────────────────────────────────────
class TestIdleDetection:
def test_not_idle_immediately(self, engine):
assert engine.is_idle() is False
def test_idle_after_threshold(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is True
def test_not_idle_when_threshold_zero(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 0
assert engine.is_idle() is False
def test_record_activity_resets_timer(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
engine.record_activity()
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is False
# ── Status dict ───────────────────────────────────────────────────────────────
class TestGetStatus:
def test_status_shape(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
status = engine.get_status()
assert "enabled" in status
assert "dreaming" in status
assert "idle" in status
assert "dream_count" in status
assert "idle_minutes" in status
def test_dream_count_starts_at_zero(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.get_status()["dream_count"] == 0
# ── Session grouping ──────────────────────────────────────────────────────────
class TestGroupIntoSessions:
def test_single_session(self, engine):
now = datetime.now(UTC)
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 1
assert len(sessions[0]) == 2
def test_splits_on_large_gap(self, engine):
now = datetime.now(UTC)
gap = _SESSION_GAP_SECONDS + 100
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 2
def test_empty_input(self, engine):
assert engine._group_into_sessions([]) == []
# ── Dream storage ─────────────────────────────────────────────────────────────
class TestDreamStorage:
def test_store_and_retrieve(self, engine):
dream = engine._store_dream(
session_excerpt="User asked about Python.",
decision_point="Python uses try/except blocks.",
simulation="I could have given a code example.",
proposed_rule="When explaining errors, include a code snippet.",
)
assert dream.id
assert dream.proposed_rule == "When explaining errors, include a code snippet."
retrieved = engine.get_recent_dreams(limit=1)
assert len(retrieved) == 1
assert retrieved[0].id == dream.id
def test_count_increments(self, engine):
assert engine.count_dreams() == 0
engine._store_dream(
session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
)
assert engine.count_dreams() == 1
# ── dream_once integration ─────────────────────────────────────────────────────
class TestDreamOnce:
@pytest.mark.asyncio
async def test_skips_when_disabled(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = False
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_not_idle(self, engine):
engine._last_activity_time = datetime.now(UTC)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 60
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_already_dreaming(self, engine):
engine._is_dreaming = True
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 0
result = await engine.dream_once()
# Reset for cleanliness
engine._is_dreaming = False
assert result is None
@pytest.mark.asyncio
async def test_dream_produces_record_when_idle(self, engine, chat_db):
"""Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
with (
patch("timmy.dreaming.settings") as mock_settings,
patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
patch("infrastructure.chat_store.DB_PATH", chat_db),
):
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
mock_settings.dreaming_timeout_seconds = 30
mock_agent.side_effect = [
"I could have provided a concrete try/except example.", # simulation
"When explaining errors, always include a runnable code snippet.", # rule
]
result = await engine.dream_once()
assert result is not None
assert isinstance(result, DreamRecord)
assert result.simulation
assert result.proposed_rule
assert engine.count_dreams() == 1