Compare commits

..

36 Commits

Author SHA1 Message Date
kimi
75a6a498b4 fix: use word-boundary regex for sensitive pattern matching to avoid false positives on max_tokens
The _SENSITIVE_PATTERNS list used simple substring matching, so "token"
matched "max_tokens", causing the distillation pipeline to block facts
about max_tokens parameters. Replaced with compiled regexes using
lookaround assertions so compound terms like max_tokens and num_tokens
are no longer falsely flagged.

Fixes #625

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 16:35:37 -04:00
84302aedac fix: pass max_tokens to Ollama provider in cascade router (#622)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 16:27:24 -04:00
2c217104db feat: real-time Spark visualization in Mission Control (#615)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 16:22:15 -04:00
7452e8a4f0 fix: add missing tests for Tower route /tower (#621)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 16:22:13 -04:00
9732c80892 feat: Real-time Spark Visualization in Tower Dashboard (#612)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 16:10:42 -04:00
f3b3d1e648 [loop-cycle-1658] feat: provider health history endpoint (#457) (#611) 2026-03-20 16:09:20 -04:00
4ba8d25749 feat: Lightning Network integration for tool usage (#610)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 13:07:02 -04:00
2622f0a0fb [loop-cycle-1242] fix: cycle_retro reads cycle_result.json (#603) (#609) 2026-03-20 12:55:01 -04:00
e3d60b89a9 fix: remove model_size kwarg from create_timmy() CLI calls (#606)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:48:49 -04:00
6214ad3225 refactor: extract helpers from run_self_tests() (#601)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:40:44 -04:00
5f5da2163f [loop-cycle] refactor: extract helpers from _handle_tool_confirmation (#592) (#600) 2026-03-20 12:32:24 -04:00
0029c34bb1 refactor: break up search_thoughts() into focused helpers (#597)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:26:51 -04:00
2577b71207 fix: capture thought timestamp at cycle start, not after LLM call (#590)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-20 12:13:48 -04:00
1a8b8ecaed [loop-cycle-1235] refactor: break up _migrate_schema() into focused helpers (#591) (#595) 2026-03-20 12:07:15 -04:00
d821e76589 [loop-cycle-1234] refactor: break up _generate_avatar_image (#563) (#589) 2026-03-20 11:57:53 -04:00
bc010ecfba [loop-cycle-1233] refactor: add docstrings to calm.py route handlers (#569) (#585) 2026-03-20 11:44:06 -04:00
faf6c1a5f1 [loop-cycle-1233] refactor: break up BaseAgent.run() (#561) (#584) 2026-03-20 11:24:36 -04:00
48103bb076 [loop-cycle-956] refactor: break up _handle_message() into focused helpers (#553) (#574) 2026-03-19 21:42:01 -04:00
9f244ffc70 refactor: break up _record_utterance() into focused helpers (#572)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:37:32 -04:00
0162a604be refactor: break up voice_loop.py::run() into focused helpers (#567)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:33:59 -04:00
2326771c5a [loop-cycle-953] refactor: DRY _import_creative_catalogs() (#560) (#565) 2026-03-19 21:21:23 -04:00
8f6cf2681b refactor: break up search_memories() into focused helpers (#557)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:16:07 -04:00
f361893fdd [loop-cycle-951] refactor: break up _migrate_schema() (#552) (#558) 2026-03-19 21:11:02 -04:00
7ad0ee17b6 refactor: break up shell.py::run() into helpers (#551)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:04:10 -04:00
29220b6bdd refactor: break up api_chat() into helpers (#547)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 21:02:04 -04:00
2849dba756 [loop-cycle-948] refactor: break up _gather_system_snapshot() into helpers (#540) (#549) 2026-03-19 20:52:13 -04:00
e11e07f117 [loop-cycle-947] refactor: break up self_reflect() into focused helpers (#505) (#546) 2026-03-19 20:49:18 -04:00
50c8a5428e refactor: break up api_chat() into helpers (#544)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:49:04 -04:00
7da434c85b [loop-cycle-946] refactor: complete airllm removal (#486) (#545) 2026-03-19 20:46:20 -04:00
88e59f7c17 refactor: break up chat_agent() into helpers (#542)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:38:46 -04:00
aa5e9c3176 refactor: break up get_memory_status() into helpers (#537)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:30:29 -04:00
1b4fe65650 fix: cache thinking agent and add timeouts to prevent loop pane death (#535)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:27:25 -04:00
2d69f73d9d fix: add timeout to thinking/loop-QA schedulers (#530)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:18:31 -04:00
ff1e43c235 [loop-cycle-545] fix: queue auto-hygiene — filter closed issues on read (#524) (#529) 2026-03-19 20:10:05 -04:00
b331aa6139 refactor: break up capture_error() into testable helpers (#523)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 20:03:28 -04:00
b45b543f2d refactor: break up create_timmy() into testable helpers (#520)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:51:59 -04:00
48 changed files with 3148 additions and 1769 deletions

View File

@@ -54,6 +54,7 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
EPOCH_COUNTER_FILE = REPO_ROOT / ".loop" / "retro" / ".epoch_counter"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
# How many recent entries to include in rolling summary
SUMMARY_WINDOW = 50
@@ -246,9 +247,37 @@ def update_summary() -> None:
SUMMARY_FILE.write_text(json.dumps(summary, indent=2) + "\n")
def _load_cycle_result() -> dict:
"""Read .loop/cycle_result.json if it exists; return empty dict on failure."""
if not CYCLE_RESULT_FILE.exists():
return {}
try:
raw = CYCLE_RESULT_FILE.read_text().strip()
# Strip hermes fence markers (```json ... ```) if present
if raw.startswith("```"):
lines = raw.splitlines()
lines = [l for l in lines if not l.startswith("```")]
raw = "\n".join(lines)
return json.loads(raw)
except (json.JSONDecodeError, OSError):
return {}
def main() -> None:
args = parse_args()
# Backfill from cycle_result.json when CLI args have defaults
cr = _load_cycle_result()
if cr:
if args.issue is None and cr.get("issue"):
args.issue = int(cr["issue"])
if args.type == "unknown" and cr.get("type"):
args.type = cr["type"]
if args.tests_passed == 0 and cr.get("tests_passed"):
args.tests_passed = int(cr["tests_passed"])
if not args.notes and cr.get("notes"):
args.notes = cr["notes"]
# Auto-detect issue from branch when not explicitly provided
if args.issue is None:
args.issue = detect_issue_from_branch()

View File

@@ -18,13 +18,19 @@ Exit codes:
from __future__ import annotations
import json
import os
import sys
import time
import urllib.request
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Backoff sequence: 60s, 120s, 240s, 600s max
BACKOFF_BASE = 60
@@ -32,19 +38,81 @@ BACKOFF_MAX = 600
BACKOFF_MULTIPLIER = 2
def _get_token() -> str:
"""Read Gitea token from env or file."""
token = os.environ.get("GITEA_TOKEN", "").strip()
if not token and TOKEN_FILE.exists():
token = TOKEN_FILE.read_text().strip()
return token
def _fetch_open_issue_numbers() -> set[int] | None:
"""Fetch open issue numbers from Gitea. Returns None on failure."""
token = _get_token()
if not token:
return None
try:
numbers: set[int] = set()
page = 1
while True:
url = (
f"{GITEA_API}/repos/{REPO_SLUG}/issues"
f"?state=open&type=issues&limit=50&page={page}"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
if not data:
break
for issue in data:
numbers.add(issue["number"])
if len(data) < 50:
break
page += 1
return numbers
except Exception:
return None
def load_queue() -> list[dict]:
"""Load queue.json and return ready items."""
"""Load queue.json and return ready items, filtering out closed issues."""
if not QUEUE_FILE.exists():
return []
try:
data = json.loads(QUEUE_FILE.read_text())
if isinstance(data, list):
return [item for item in data if item.get("ready")]
return []
if not isinstance(data, list):
return []
ready = [item for item in data if item.get("ready")]
if not ready:
return []
# Filter out issues that are no longer open (auto-hygiene)
open_numbers = _fetch_open_issue_numbers()
if open_numbers is not None:
before = len(ready)
ready = [item for item in ready if item.get("issue") in open_numbers]
removed = before - len(ready)
if removed > 0:
print(f"[loop-guard] Filtered {removed} closed issue(s) from queue")
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError):
return []
def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None:
"""Rewrite queue.json without closed issues."""
cleaned = [item for item in full_queue if item.get("issue") in open_numbers]
try:
QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n")
except OSError:
pass
def load_idle_state() -> dict:
"""Load persistent idle state."""
if not IDLE_STATE_FILE.exists():

View File

@@ -254,6 +254,7 @@ class Settings(BaseSettings):
# When enabled, the agent starts an internal thought loop on server start.
thinking_enabled: bool = True
thinking_interval_seconds: int = 300 # 5 minutes between thoughts
thinking_timeout_seconds: int = 120 # max wall-clock time per thinking cycle
thinking_distill_every: int = 10 # distill facts from thoughts every Nth thought
thinking_issue_every: int = 20 # file Gitea issues from thoughts every Nth thought
thinking_memory_check_every: int = 50 # check memory status every Nth thought

View File

@@ -46,6 +46,7 @@ from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.telegram import router as telegram_router
from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import router as world_router
@@ -155,7 +156,17 @@ async def _thinking_scheduler() -> None:
while True:
try:
if settings.thinking_enabled:
await thinking_engine.think_once()
await asyncio.wait_for(
thinking_engine.think_once(),
timeout=settings.thinking_timeout_seconds,
)
except TimeoutError:
logger.warning(
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Thinking scheduler error: %s", exc)
@@ -175,7 +186,10 @@ async def _loop_qa_scheduler() -> None:
while True:
try:
if settings.loop_qa_enabled:
result = await loop_qa_orchestrator.run_next_test()
result = await asyncio.wait_for(
loop_qa_orchestrator.run_next_test(),
timeout=settings.thinking_timeout_seconds,
)
if result:
status = "PASS" if result["success"] else "FAIL"
logger.info(
@@ -184,6 +198,13 @@ async def _loop_qa_scheduler() -> None:
status,
result.get("details", "")[:80],
)
except TimeoutError:
logger.warning(
"Loop QA test timed out after %ds",
settings.thinking_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Loop QA scheduler error: %s", exc)
@@ -563,6 +584,7 @@ app.include_router(system_router)
app.include_router(experiments_router)
app.include_router(db_explorer_router)
app.include_router(world_router)
app.include_router(tower_router)
@app.websocket("/ws")

View File

@@ -71,21 +71,20 @@ async def clear_history(request: Request):
)
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response with native Agno tool confirmation."""
def _validate_message(message: str) -> str:
"""Strip and validate chat input; raise HTTPException on bad input."""
from fastapi import HTTPException
message = message.strip()
if not message:
from fastapi import HTTPException
raise HTTPException(status_code=400, detail="Message cannot be empty")
if len(message) > MAX_MESSAGE_LENGTH:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="Message too long")
return message
# Record user activity so the thinking engine knows we're not idle
def _record_user_activity() -> None:
"""Notify the thinking engine that the user is active."""
try:
from timmy.thinking import thinking_engine
@@ -93,6 +92,67 @@ async def chat_agent(request: Request, message: str = Form(...)):
except Exception:
logger.debug("Failed to record user input for thinking engine")
def _extract_tool_actions(run_output) -> list[dict]:
"""If Agno paused the run for tool confirmation, build approval items."""
from timmy.approvals import create_item
tool_actions: list[dict] = []
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if not (is_paused and getattr(run_output, "active_requirements", None)):
return tool_actions
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
return tool_actions
def _log_exchange(
message: str, response_text: str | None, error_text: str | None, timestamp: str
) -> None:
"""Append user message and agent/error reply to the in-memory log."""
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
@router.post("/default/chat", response_class=HTMLResponse)
async def chat_agent(request: Request, message: str = Form(...)):
"""Chat — synchronous response with native Agno tool confirmation."""
message = _validate_message(message)
_record_user_activity()
timestamp = datetime.now().strftime("%H:%M:%S")
response_text = None
error_text = None
@@ -104,54 +164,15 @@ async def chat_agent(request: Request, message: str = Form(...)):
error_text = f"Chat error: {exc}"
run_output = None
# Check if Agno paused the run for tool confirmation
tool_actions = []
tool_actions: list[dict] = []
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if is_paused and getattr(run_output, "active_requirements", None):
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from timmy.approvals import create_item
item = create_item(
title=f"Dashboard: {tool_name}",
description=format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=get_impact_level(tool_name),
)
_pending_runs[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
}
tool_actions.append(
{
"approval_id": item.id,
"tool_name": tool_name,
"description": format_action_description(tool_name, tool_args),
"impact": get_impact_level(tool_name),
}
)
tool_actions = _extract_tool_actions(run_output)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response_text = _clean_response(raw_content or "")
if not response_text and not tool_actions:
response_text = None # let error template show if needed
response_text = None
message_log.append(role="user", content=message, timestamp=timestamp, source="browser")
if response_text:
message_log.append(
role="agent", content=response_text, timestamp=timestamp, source="browser"
)
elif error_text:
message_log.append(role="error", content=error_text, timestamp=timestamp, source="browser")
_log_exchange(message, response_text, error_text, timestamp)
return templates.TemplateResponse(
request,

View File

@@ -19,14 +19,17 @@ router = APIRouter(tags=["calm"])
# Helper functions for state machine logic
def get_now_task(db: Session) -> Task | None:
"""Return the single active NOW task, or None."""
return db.query(Task).filter(Task.state == TaskState.NOW).first()
def get_next_task(db: Session) -> Task | None:
"""Return the single queued NEXT task, or None."""
return db.query(Task).filter(Task.state == TaskState.NEXT).first()
def get_later_tasks(db: Session) -> list[Task]:
"""Return all LATER tasks ordered by MIT flag then sort_order."""
return (
db.query(Task)
.filter(Task.state == TaskState.LATER)
@@ -36,6 +39,12 @@ def get_later_tasks(db: Session) -> list[Task]:
def promote_tasks(db: Session):
"""Enforce the NOW/NEXT/LATER state machine invariants.
- At most one NOW task (extras demoted to NEXT).
- If no NOW, promote NEXT -> NOW.
- If no NEXT, promote highest-priority LATER -> NEXT.
"""
# Ensure only one NOW task exists. If multiple, demote extras to NEXT.
now_tasks = db.query(Task).filter(Task.state == TaskState.NOW).all()
if len(now_tasks) > 1:
@@ -74,6 +83,7 @@ def promote_tasks(db: Session):
# Endpoints
@router.get("/calm", response_class=HTMLResponse)
async def get_calm_view(request: Request, db: Session = Depends(get_db)):
"""Render the main CALM dashboard with NOW/NEXT/LATER counts."""
now_task = get_now_task(db)
next_task = get_next_task(db)
later_tasks_count = len(get_later_tasks(db))
@@ -90,6 +100,7 @@ async def get_calm_view(request: Request, db: Session = Depends(get_db)):
@router.get("/calm/ritual/morning", response_class=HTMLResponse)
async def get_morning_ritual_form(request: Request):
"""Render the morning ritual intake form."""
return templates.TemplateResponse(request, "calm/morning_ritual_form.html", {})
@@ -102,6 +113,7 @@ async def post_morning_ritual(
mit3_title: str = Form(None),
other_tasks: str = Form(""),
):
"""Process morning ritual: create MITs, other tasks, and set initial states."""
# Create Journal Entry
mit_task_ids = []
journal_entry = JournalEntry(entry_date=date.today())
@@ -173,6 +185,7 @@ async def post_morning_ritual(
@router.get("/calm/ritual/evening", response_class=HTMLResponse)
async def get_evening_ritual_form(request: Request, db: Session = Depends(get_db)):
"""Render the evening ritual form for today's journal entry."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -189,6 +202,7 @@ async def post_evening_ritual(
gratitude: str = Form(None),
energy_level: int = Form(None),
):
"""Process evening ritual: save reflection/gratitude, archive active tasks."""
journal_entry = db.query(JournalEntry).filter(JournalEntry.entry_date == date.today()).first()
if not journal_entry:
raise HTTPException(status_code=404, detail="No journal entry for today")
@@ -223,6 +237,7 @@ async def create_new_task(
is_mit: bool = Form(False),
certainty: TaskCertainty = Form(TaskCertainty.SOFT),
):
"""Create a new task in LATER state and return updated count."""
task = Task(
title=title,
description=description,
@@ -247,6 +262,7 @@ async def start_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Move a task to NOW state, demoting the current NOW to NEXT."""
current_now_task = get_now_task(db)
if current_now_task and current_now_task.id != task_id:
current_now_task.state = TaskState.NEXT # Demote current NOW to NEXT
@@ -281,6 +297,7 @@ async def complete_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Mark a task as DONE and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
@@ -309,6 +326,7 @@ async def defer_task(
task_id: int,
db: Session = Depends(get_db),
):
"""Defer a task and trigger state promotion."""
task = db.query(Task).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
@@ -333,6 +351,7 @@ async def defer_task(
@router.get("/calm/partials/later_tasks_list", response_class=HTMLResponse)
async def get_later_tasks_list(request: Request, db: Session = Depends(get_db)):
"""Render the expandable list of LATER tasks."""
later_tasks = get_later_tasks(db)
return templates.TemplateResponse(
"calm/partials/later_tasks_list.html",
@@ -348,6 +367,7 @@ async def reorder_tasks(
later_task_ids: str = Form(""),
next_task_id: int | None = Form(None),
):
"""Reorder LATER tasks and optionally promote one to NEXT."""
# Reorder LATER tasks
if later_task_ids:
ids_in_order = [int(x.strip()) for x in later_task_ids.split(",") if x.strip()]

View File

@@ -31,6 +31,93 @@ _UPLOAD_DIR = str(Path(settings.repo_root) / "data" / "chat-uploads")
_MAX_UPLOAD_SIZE = 50 * 1024 * 1024 # 50 MB
# ── POST /api/chat — helpers ─────────────────────────────────────────────────
async def _parse_chat_body(request: Request) -> tuple[dict | None, JSONResponse | None]:
"""Parse and validate the JSON request body.
Returns (body, None) on success or (None, error_response) on failure.
"""
content_length = request.headers.get("content-length")
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return None, JSONResponse(status_code=413, content={"error": "Request body too large"})
try:
body = await request.json()
except Exception as exc:
logger.warning("Chat API JSON parse error: %s", exc)
return None, JSONResponse(status_code=400, content={"error": "Invalid JSON"})
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return None, JSONResponse(status_code=400, content={"error": "messages array is required"})
return body, None
def _extract_user_message(messages: list[dict]) -> str | None:
"""Return the text of the last user message, or *None* if absent."""
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
if isinstance(content, list):
text_parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
return " ".join(text_parts).strip() or None
text = str(content).strip()
return text or None
return None
def _build_context_prefix() -> str:
"""Build the system-context preamble injected before the user message."""
now = datetime.now()
return (
f"[System: Current date/time is "
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
def _notify_thinking_engine() -> None:
"""Record user activity so the thinking engine knows we're not idle."""
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
logger.debug("Failed to record user input for thinking engine")
async def _process_chat(user_msg: str) -> dict | JSONResponse:
"""Send *user_msg* to the agent, log the exchange, and return a response."""
_notify_thinking_engine()
timestamp = datetime.now().strftime("%H:%M:%S")
try:
response_text = await agent_chat(
_build_context_prefix() + user_msg,
session_id="mobile",
)
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Agent is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
)
# ── POST /api/chat ────────────────────────────────────────────────────────────
@@ -44,78 +131,15 @@ async def api_chat(request: Request):
Response:
{"reply": "...", "timestamp": "HH:MM:SS"}
"""
# Enforce request body size limit
content_length = request.headers.get("content-length")
if content_length and int(content_length) > settings.chat_api_max_body_bytes:
return JSONResponse(status_code=413, content={"error": "Request body too large"})
body, err = await _parse_chat_body(request)
if err:
return err
try:
body = await request.json()
except Exception as exc:
logger.warning("Chat API JSON parse error: %s", exc)
return JSONResponse(status_code=400, content={"error": "Invalid JSON"})
messages = body.get("messages")
if not messages or not isinstance(messages, list):
return JSONResponse(status_code=400, content={"error": "messages array is required"})
# Extract the latest user message text
last_user_msg = None
for msg in reversed(messages):
if msg.get("role") == "user":
content = msg.get("content", "")
# Handle multimodal content arrays — extract text parts
if isinstance(content, list):
text_parts = [
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
]
last_user_msg = " ".join(text_parts).strip()
else:
last_user_msg = str(content).strip()
break
if not last_user_msg:
user_msg = _extract_user_message(body["messages"])
if not user_msg:
return JSONResponse(status_code=400, content={"error": "No user message found"})
# Record user activity so the thinking engine knows we're not idle
try:
from timmy.thinking import thinking_engine
thinking_engine.record_user_input()
except Exception:
logger.debug("Failed to record user input for thinking engine")
timestamp = datetime.now().strftime("%H:%M:%S")
try:
# Inject context (same pattern as the HTMX chat handler in agents.py)
now = datetime.now()
context_prefix = (
f"[System: Current date/time is "
f"{now.strftime('%A, %B %d, %Y at %I:%M %p')}]\n"
f"[System: Mobile client]\n\n"
)
response_text = await agent_chat(
context_prefix + last_user_msg,
session_id="mobile",
)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="agent", content=response_text, timestamp=timestamp, source="api")
return {"reply": response_text, "timestamp": timestamp}
except Exception as exc:
error_msg = f"Agent is offline: {exc}"
logger.error("api_chat error: %s", exc)
message_log.append(role="user", content=last_user_msg, timestamp=timestamp, source="api")
message_log.append(role="error", content=error_msg, timestamp=timestamp, source="api")
return JSONResponse(
status_code=503,
content={"error": error_msg, "timestamp": timestamp},
)
return await _process_chat(user_msg)
# ── POST /api/upload ──────────────────────────────────────────────────────────

View File

@@ -16,52 +16,11 @@ router = APIRouter(tags=["system"])
@router.get("/lightning/ledger", response_class=HTMLResponse)
async def lightning_ledger(request: Request):
"""Ledger and balance page."""
# Mock data for now, as this seems to be a UI-first feature
balance = {
"available_sats": 1337,
"incoming_total_sats": 2000,
"outgoing_total_sats": 663,
"fees_paid_sats": 5,
"net_sats": 1337,
"pending_incoming_sats": 0,
"pending_outgoing_sats": 0,
}
"""Ledger and balance page backed by the in-memory Lightning ledger."""
from lightning.ledger import get_balance, get_transactions
# Mock transactions
from collections import namedtuple
from enum import Enum
class TxType(Enum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(Enum):
completed = "completed"
pending = "pending"
Tx = namedtuple(
"Tx", ["tx_type", "status", "amount_sats", "payment_hash", "memo", "created_at"]
)
transactions = [
Tx(
TxType.outgoing,
TxStatus.completed,
50,
"hash1",
"Model inference",
"2026-03-04 10:00:00",
),
Tx(
TxType.incoming,
TxStatus.completed,
1000,
"hash2",
"Manual deposit",
"2026-03-03 15:00:00",
),
]
balance = get_balance()
transactions = get_transactions()
return templates.TemplateResponse(
request,
@@ -70,7 +29,7 @@ async def lightning_ledger(request: Request):
"balance": balance,
"transactions": transactions,
"tx_types": ["incoming", "outgoing"],
"tx_statuses": ["completed", "pending"],
"tx_statuses": ["pending", "settled", "failed", "expired"],
"filter_type": None,
"filter_status": None,
"stats": {},

View File

@@ -0,0 +1,108 @@
"""Tower dashboard — real-time Spark visualization via WebSocket.
GET /tower — HTML Tower dashboard (Thinking / Predicting / Advising)
WS /tower/ws — WebSocket stream of Spark engine state updates
"""
import asyncio
import json
import logging
from fastapi import APIRouter, Request, WebSocket
from fastapi.responses import HTMLResponse
from dashboard.templating import templates
from spark.engine import spark_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tower", tags=["tower"])
_PUSH_INTERVAL = 5 # seconds between state broadcasts
def _spark_snapshot() -> dict:
"""Build a JSON-serialisable snapshot of Spark state."""
status = spark_engine.status()
timeline = spark_engine.get_timeline(limit=10)
events = []
for ev in timeline:
entry = {
"event_type": ev.event_type,
"description": ev.description,
"importance": ev.importance,
"created_at": ev.created_at,
}
if ev.agent_id:
entry["agent_id"] = ev.agent_id[:8]
if ev.task_id:
entry["task_id"] = ev.task_id[:8]
try:
entry["data"] = json.loads(ev.data)
except (json.JSONDecodeError, TypeError):
entry["data"] = {}
events.append(entry)
predictions = spark_engine.get_predictions(limit=5)
preds = []
for p in predictions:
pred = {
"task_id": p.task_id[:8] if p.task_id else "?",
"accuracy": p.accuracy,
"evaluated": p.evaluated_at is not None,
"created_at": p.created_at,
}
try:
pred["predicted"] = json.loads(p.predicted_value)
except (json.JSONDecodeError, TypeError):
pred["predicted"] = {}
preds.append(pred)
advisories = spark_engine.get_advisories()
advs = [
{
"category": a.category,
"priority": a.priority,
"title": a.title,
"detail": a.detail,
"suggested_action": a.suggested_action,
}
for a in advisories
]
return {
"type": "spark_state",
"status": status,
"events": events,
"predictions": preds,
"advisories": advs,
}
@router.get("", response_class=HTMLResponse)
async def tower_ui(request: Request):
"""Render the Tower dashboard page."""
snapshot = _spark_snapshot()
return templates.TemplateResponse(
request,
"tower.html",
{"snapshot": snapshot},
)
@router.websocket("/ws")
async def tower_ws(websocket: WebSocket) -> None:
"""Stream Spark state snapshots to the Tower dashboard."""
await websocket.accept()
logger.info("Tower WS connected")
try:
# Send initial snapshot
await websocket.send_text(json.dumps(_spark_snapshot()))
while True:
await asyncio.sleep(_PUSH_INTERVAL)
await websocket.send_text(json.dumps(_spark_snapshot()))
except Exception:
logger.debug("Tower WS disconnected")

View File

@@ -138,6 +138,47 @@
</div>
</div>
<!-- Spark Intelligence -->
{% from "macros.html" import panel %}
<div class="mc-card-spaced">
<div class="card">
<div class="card-header">
<h2 class="card-title">Spark Intelligence</h2>
<div>
<span class="badge" id="spark-status-badge">Loading...</span>
</div>
</div>
<div class="grid grid-3">
<div class="stat">
<div class="stat-value" id="spark-events">-</div>
<div class="stat-label">Events</div>
</div>
<div class="stat">
<div class="stat-value" id="spark-memories">-</div>
<div class="stat-label">Memories</div>
</div>
<div class="stat">
<div class="stat-value" id="spark-predictions">-</div>
<div class="stat-label">Predictions</div>
</div>
</div>
</div>
<div class="grid grid-2 mc-section-gap">
{% call panel("SPARK TIMELINE", id="spark-timeline-panel",
hx_get="/spark/timeline",
hx_trigger="load, every 10s") %}
<div class="spark-timeline-scroll">
<p class="chat-history-placeholder">Loading timeline...</p>
</div>
{% endcall %}
{% call panel("SPARK INSIGHTS", id="spark-insights-panel",
hx_get="/spark/insights",
hx_trigger="load, every 30s") %}
<p class="chat-history-placeholder">Loading insights...</p>
{% endcall %}
</div>
</div>
<!-- Chat History -->
<div class="card mc-card-spaced">
<div class="card-header">
@@ -428,7 +469,34 @@ async function loadGrokStats() {
}
}
// Load Spark status
async function loadSparkStatus() {
try {
var response = await fetch('/spark');
var data = await response.json();
var st = data.status || {};
document.getElementById('spark-events').textContent = st.total_events || 0;
document.getElementById('spark-memories').textContent = st.total_memories || 0;
document.getElementById('spark-predictions').textContent = st.total_predictions || 0;
var badge = document.getElementById('spark-status-badge');
if (st.total_events > 0) {
badge.textContent = 'Active';
badge.className = 'badge badge-success';
} else {
badge.textContent = 'Idle';
badge.className = 'badge badge-warning';
}
} catch (error) {
var badge = document.getElementById('spark-status-badge');
badge.textContent = 'Offline';
badge.className = 'badge badge-danger';
}
}
// Initial load
loadSparkStatus();
loadSovereignty();
loadHealth();
loadSwarmStats();
@@ -442,5 +510,6 @@ setInterval(loadHealth, 10000);
setInterval(loadSwarmStats, 5000);
setInterval(updateHeartbeat, 5000);
setInterval(loadGrokStats, 10000);
setInterval(loadSparkStatus, 15000);
</script>
{% endblock %}

View File

@@ -0,0 +1,180 @@
{% extends "base.html" %}
{% block title %}Timmy Time — Tower{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="container-fluid tower-container py-3">
<div class="tower-header">
<div class="tower-title">TOWER</div>
<div class="tower-subtitle">
Real-time Spark visualization &mdash;
<span id="tower-conn" class="tower-conn-badge tower-conn-connecting">CONNECTING</span>
</div>
</div>
<div class="row g-3">
<!-- Left: THINKING (events) -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-thinking">// THINKING</div>
<div class="card-body p-3 tower-scroll" id="tower-events">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
</div>
<!-- Middle: PREDICTING (EIDOS) -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-predicting">// PREDICTING</div>
<div class="card-body p-3" id="tower-predictions">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
<div class="card mc-panel">
<div class="card-header mc-panel-header">// EIDOS STATS</div>
<div class="card-body p-3">
<div class="tower-stat-grid" id="tower-stats">
<div class="tower-stat"><span class="tower-stat-label">EVENTS</span><span class="tower-stat-value" id="ts-events">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">MEMORIES</span><span class="tower-stat-value" id="ts-memories">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">PREDICTIONS</span><span class="tower-stat-value" id="ts-preds">0</span></div>
<div class="tower-stat"><span class="tower-stat-label">ACCURACY</span><span class="tower-stat-value" id="ts-accuracy"></span></div>
</div>
</div>
</div>
</div>
<!-- Right: ADVISING -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<div class="card mc-panel tower-phase-card">
<div class="card-header mc-panel-header tower-phase-advising">// ADVISING</div>
<div class="card-body p-3 tower-scroll" id="tower-advisories">
<div class="tower-empty">Waiting for Spark data&hellip;</div>
</div>
</div>
</div>
</div>
</div>
<script>
(function() {
var ws = null;
var badge = document.getElementById('tower-conn');
function setConn(state) {
badge.textContent = state.toUpperCase();
badge.className = 'tower-conn-badge tower-conn-' + state;
}
function esc(s) { var d = document.createElement('div'); d.textContent = s; return d.innerHTML; }
function renderEvents(events) {
var el = document.getElementById('tower-events');
if (!events || !events.length) { el.innerHTML = '<div class="tower-empty">No events captured yet.</div>'; return; }
var html = '';
for (var i = 0; i < events.length; i++) {
var ev = events[i];
var dots = ev.importance >= 0.8 ? '\u25cf\u25cf\u25cf' : ev.importance >= 0.5 ? '\u25cf\u25cf' : '\u25cf';
html += '<div class="tower-event tower-etype-' + esc(ev.event_type) + '">'
+ '<div class="tower-ev-head">'
+ '<span class="tower-ev-badge">' + esc(ev.event_type.replace(/_/g, ' ').toUpperCase()) + '</span>'
+ '<span class="tower-ev-dots">' + dots + '</span>'
+ '</div>'
+ '<div class="tower-ev-desc">' + esc(ev.description) + '</div>'
+ '<div class="tower-ev-time">' + esc((ev.created_at || '').slice(0, 19)) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderPredictions(preds) {
var el = document.getElementById('tower-predictions');
if (!preds || !preds.length) { el.innerHTML = '<div class="tower-empty">No predictions yet.</div>'; return; }
var html = '';
for (var i = 0; i < preds.length; i++) {
var p = preds[i];
var cls = p.evaluated ? 'tower-pred-done' : 'tower-pred-pending';
var accTxt = p.accuracy != null ? Math.round(p.accuracy * 100) + '%' : 'PENDING';
var accCls = p.accuracy != null ? (p.accuracy >= 0.7 ? 'text-success' : p.accuracy < 0.4 ? 'text-danger' : 'text-warning') : '';
html += '<div class="tower-pred ' + cls + '">'
+ '<div class="tower-pred-head">'
+ '<span class="tower-pred-task">' + esc(p.task_id) + '</span>'
+ '<span class="tower-pred-acc ' + accCls + '">' + accTxt + '</span>'
+ '</div>';
if (p.predicted) {
var pr = p.predicted;
html += '<div class="tower-pred-detail">';
if (pr.likely_winner) html += '<span>Winner: ' + esc(pr.likely_winner.slice(0, 8)) + '</span> ';
if (pr.success_probability != null) html += '<span>Success: ' + Math.round(pr.success_probability * 100) + '%</span> ';
html += '</div>';
}
html += '<div class="tower-ev-time">' + esc((p.created_at || '').slice(0, 19)) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderAdvisories(advs) {
var el = document.getElementById('tower-advisories');
if (!advs || !advs.length) { el.innerHTML = '<div class="tower-empty">No advisories yet.</div>'; return; }
var html = '';
for (var i = 0; i < advs.length; i++) {
var a = advs[i];
var prio = a.priority >= 0.7 ? 'high' : a.priority >= 0.4 ? 'medium' : 'low';
html += '<div class="tower-advisory tower-adv-' + prio + '">'
+ '<div class="tower-adv-head">'
+ '<span class="tower-adv-cat">' + esc(a.category.replace(/_/g, ' ').toUpperCase()) + '</span>'
+ '<span class="tower-adv-prio">' + Math.round(a.priority * 100) + '%</span>'
+ '</div>'
+ '<div class="tower-adv-title">' + esc(a.title) + '</div>'
+ '<div class="tower-adv-detail">' + esc(a.detail) + '</div>'
+ '<div class="tower-adv-action">' + esc(a.suggested_action) + '</div>'
+ '</div>';
}
el.innerHTML = html;
}
function renderStats(status) {
if (!status) return;
document.getElementById('ts-events').textContent = status.events_captured || 0;
document.getElementById('ts-memories').textContent = status.memories_stored || 0;
var p = status.predictions || {};
document.getElementById('ts-preds').textContent = p.total_predictions || 0;
var acc = p.avg_accuracy;
var accEl = document.getElementById('ts-accuracy');
if (acc != null) {
accEl.textContent = Math.round(acc * 100) + '%';
accEl.className = 'tower-stat-value ' + (acc >= 0.7 ? 'text-success' : acc < 0.4 ? 'text-danger' : 'text-warning');
} else {
accEl.textContent = '\u2014';
}
}
function handleMsg(data) {
if (data.type !== 'spark_state') return;
renderEvents(data.events);
renderPredictions(data.predictions);
renderAdvisories(data.advisories);
renderStats(data.status);
}
function connect() {
var proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
ws = new WebSocket(proto + '//' + location.host + '/tower/ws');
ws.onopen = function() { setConn('live'); };
ws.onclose = function() { setConn('offline'); setTimeout(connect, 3000); };
ws.onerror = function() { setConn('offline'); };
ws.onmessage = function(e) {
try { handleMsg(JSON.parse(e.data)); } catch(err) { console.error('Tower WS parse error', err); }
};
}
connect();
})();
</script>
{% endblock %}

View File

@@ -100,36 +100,14 @@ def _get_git_context() -> dict:
return {"branch": "unknown", "commit": "unknown"}
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
def _extract_traceback_info(exc: Exception) -> tuple[str, str, int]:
"""Extract formatted traceback, affected file, and line number.
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
Tuple of (traceback_string, affected_file, affected_line).
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
# Format the stack trace
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
# Extract file/line from traceback
tb_obj = exc.__traceback__
affected_file = "unknown"
affected_line = 0
@@ -139,9 +117,18 @@ def capture_error(
affected_file = tb_obj.tb_frame.f_code.co_filename
affected_line = tb_obj.tb_lineno
git_ctx = _get_git_context()
return tb_str, affected_file, affected_line
# 1. Log to event_log
def _log_error_event(
exc: Exception,
source: str,
error_hash: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> None:
"""Log the captured error to the event log."""
try:
from swarm.event_log import EventType, log_event
@@ -161,8 +148,18 @@ def capture_error(
except Exception as log_exc:
logger.debug("Failed to log error event: %s", log_exc)
# 2. Create bug report task
task_id = None
def _create_bug_report(
exc: Exception,
source: str,
context: dict | None,
error_hash: str,
tb_str: str,
affected_file: str,
affected_line: int,
git_ctx: dict,
) -> str | None:
"""Create a bug report task and return the task ID (or None on failure)."""
try:
from swarm.task_queue.models import create_task
@@ -195,7 +192,6 @@ def capture_error(
)
task_id = task.id
# Log the creation event
try:
from swarm.event_log import EventType, log_event
@@ -210,12 +206,16 @@ def capture_error(
)
except Exception as exc:
logger.warning("Bug report screenshot error: %s", exc)
pass
return task_id
except Exception as task_exc:
logger.debug("Failed to create bug report task: %s", task_exc)
return None
# 3. Send notification
def _notify_bug_report(exc: Exception, source: str) -> None:
"""Send a push notification about the captured error."""
try:
from infrastructure.notifications.push import notifier
@@ -224,11 +224,12 @@ def capture_error(
message=f"{type(exc).__name__} in {source}: {str(exc)[:80]}",
category="system",
)
except Exception as exc:
logger.warning("Bug report notification error: %s", exc)
pass
except Exception as notify_exc:
logger.warning("Bug report notification error: %s", notify_exc)
# 4. Record in session logger (via registered callback)
def _record_to_session(exc: Exception, source: str) -> None:
"""Record the error via the registered session callback."""
if _error_recorder is not None:
try:
_error_recorder(
@@ -238,4 +239,50 @@ def capture_error(
except Exception as log_exc:
logger.warning("Bug report session logging error: %s", log_exc)
def capture_error(
exc: Exception,
source: str = "unknown",
context: dict | None = None,
) -> str | None:
"""Capture an error and optionally create a bug report.
Args:
exc: The exception to capture
source: Module/component where the error occurred
context: Optional dict of extra context (request path, etc.)
Returns:
Task ID of the created bug report, or None if deduplicated/disabled
"""
from config import settings
if not settings.error_feedback_enabled:
return None
error_hash = _stack_hash(exc)
if _is_duplicate(error_hash):
logger.debug("Duplicate error suppressed: %s (hash=%s)", exc, error_hash)
return None
tb_str, affected_file, affected_line = _extract_traceback_info(exc)
git_ctx = _get_git_context()
_log_error_event(exc, source, error_hash, affected_file, affected_line, git_ctx)
task_id = _create_bug_report(
exc,
source,
context,
error_hash,
tb_str,
affected_file,
affected_line,
git_ctx,
)
_notify_bug_report(exc, source)
_record_to_session(exc, source)
return task_id

View File

@@ -144,6 +144,65 @@ class ShellHand:
return None
@staticmethod
def _build_run_env(env: dict | None) -> dict:
"""Merge *env* overrides into a copy of the current environment."""
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
return run_env
async def _execute_subprocess(
self,
command: str,
effective_timeout: int,
cwd: str | None,
run_env: dict,
start: float,
) -> ShellResult:
"""Run *command* as a subprocess with timeout enforcement."""
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
async def run(
self,
command: str,
@@ -164,7 +223,6 @@ class ShellHand:
"""
start = time.time()
# Validate
validation_error = self._validate_command(command)
if validation_error:
return ShellResult(
@@ -178,52 +236,8 @@ class ShellHand:
cwd = working_dir or self._working_dir
try:
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
run_env = self._build_run_env(env)
return await self._execute_subprocess(command, effective_timeout, cwd, run_env, start)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Shell command failed: %s%s", command, exc)

View File

@@ -2,6 +2,7 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .history import HealthHistoryStore, get_history_store
__all__ = [
"CascadeRouter",
@@ -9,4 +10,6 @@ __all__ = [
"ProviderStatus",
"get_router",
"router",
"HealthHistoryStore",
"get_history_store",
]

View File

@@ -8,6 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from .cascade import CascadeRouter, get_router
from .history import HealthHistoryStore, get_history_store
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/router", tags=["router"])
@@ -199,6 +200,17 @@ async def reload_config(
raise HTTPException(status_code=500, detail=f"Reload failed: {exc}") from exc
@router.get("/history")
async def get_history(
hours: int = 24,
store: Annotated[HealthHistoryStore, Depends(get_history_store)] = None,
) -> list[dict[str, Any]]:
"""Get provider health history for the last N hours."""
if store is None:
store = get_history_store()
return store.get_history(hours=hours)
@router.get("/config")
async def get_config(
cascade: Annotated[CascadeRouter, Depends(get_cascade_router)],

View File

@@ -564,6 +564,7 @@ class CascadeRouter:
messages=messages,
model=model or provider.get_default_model(),
temperature=temperature,
max_tokens=max_tokens,
content_type=content_type,
)
elif provider.type == "openai":
@@ -604,6 +605,7 @@ class CascadeRouter:
messages: list[dict],
model: str,
temperature: float,
max_tokens: int | None = None,
content_type: ContentType = ContentType.TEXT,
) -> dict:
"""Call Ollama API with multi-modal support."""
@@ -614,13 +616,15 @@ class CascadeRouter:
# Transform messages for Ollama format (including images)
transformed_messages = self._transform_messages_for_ollama(messages)
options = {"temperature": temperature}
if max_tokens:
options["num_predict"] = max_tokens
payload = {
"model": model,
"messages": transformed_messages,
"stream": False,
"options": {
"temperature": temperature,
},
"options": options,
}
timeout = aiohttp.ClientTimeout(total=self.config.timeout_seconds)

View File

@@ -0,0 +1,152 @@
"""Provider health history — time-series snapshots for dashboard visualization."""
import asyncio
import logging
import sqlite3
from datetime import UTC, datetime, timedelta
from pathlib import Path
logger = logging.getLogger(__name__)
_store: "HealthHistoryStore | None" = None
class HealthHistoryStore:
"""Stores timestamped provider health snapshots in SQLite."""
def __init__(self, db_path: str = "data/router_history.db") -> None:
self.db_path = db_path
if db_path != ":memory:":
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._init_schema()
self._bg_task: asyncio.Task | None = None
def _init_schema(self) -> None:
self._conn.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
provider_name TEXT NOT NULL,
status TEXT NOT NULL,
error_rate REAL NOT NULL,
avg_latency_ms REAL NOT NULL,
circuit_state TEXT NOT NULL,
total_requests INTEGER NOT NULL
)
""")
self._conn.execute("""
CREATE INDEX IF NOT EXISTS idx_snapshots_ts
ON snapshots(timestamp)
""")
self._conn.commit()
def record_snapshot(self, providers: list[dict]) -> None:
"""Record a health snapshot for all providers."""
ts = datetime.now(UTC).isoformat()
rows = [
(
ts,
p["name"],
p["status"],
p["error_rate"],
p["avg_latency_ms"],
p["circuit_state"],
p["total_requests"],
)
for p in providers
]
self._conn.executemany(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
rows,
)
self._conn.commit()
def get_history(self, hours: int = 24) -> list[dict]:
"""Return snapshots from the last N hours, grouped by timestamp."""
cutoff = (datetime.now(UTC) - timedelta(hours=hours)).isoformat()
rows = self._conn.execute(
"""SELECT timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests
FROM snapshots WHERE timestamp >= ? ORDER BY timestamp""",
(cutoff,),
).fetchall()
# Group by timestamp
snapshots: dict[str, list[dict]] = {}
for row in rows:
ts = row["timestamp"]
if ts not in snapshots:
snapshots[ts] = []
snapshots[ts].append(
{
"name": row["provider_name"],
"status": row["status"],
"error_rate": row["error_rate"],
"avg_latency_ms": row["avg_latency_ms"],
"circuit_state": row["circuit_state"],
"total_requests": row["total_requests"],
}
)
return [{"timestamp": ts, "providers": providers} for ts, providers in snapshots.items()]
def prune(self, keep_hours: int = 168) -> int:
"""Remove snapshots older than keep_hours. Returns rows deleted."""
cutoff = (datetime.now(UTC) - timedelta(hours=keep_hours)).isoformat()
cursor = self._conn.execute("DELETE FROM snapshots WHERE timestamp < ?", (cutoff,))
self._conn.commit()
return cursor.rowcount
def close(self) -> None:
"""Close the database connection."""
if self._bg_task and not self._bg_task.done():
self._bg_task.cancel()
self._conn.close()
def _capture_snapshot(self, cascade_router) -> None: # noqa: ANN001
"""Capture current provider state as a snapshot."""
providers = []
for p in cascade_router.providers:
providers.append(
{
"name": p.name,
"status": p.status.value,
"error_rate": round(p.metrics.error_rate, 4),
"avg_latency_ms": round(p.metrics.avg_latency_ms, 2),
"circuit_state": p.circuit_state.value,
"total_requests": p.metrics.total_requests,
}
)
self.record_snapshot(providers)
async def start_background_task(
self,
cascade_router,
interval_seconds: int = 60, # noqa: ANN001
) -> None:
"""Start periodic snapshot capture."""
async def _loop() -> None:
while True:
try:
self._capture_snapshot(cascade_router)
logger.debug("Recorded health snapshot")
except Exception:
logger.exception("Failed to record health snapshot")
await asyncio.sleep(interval_seconds)
self._bg_task = asyncio.create_task(_loop())
logger.info("Health history background task started (interval=%ds)", interval_seconds)
def get_history_store() -> HealthHistoryStore:
"""Get or create the singleton history store."""
global _store # noqa: PLW0603
if _store is None:
_store = HealthHistoryStore()
return _store

View File

@@ -515,25 +515,36 @@ class DiscordVendor(ChatPlatform):
async def _handle_message(self, message) -> None:
"""Process an incoming message and respond via a thread."""
# Strip the bot mention from the message content
content = message.content
if self._client.user:
content = content.replace(f"<@{self._client.user.id}>", "").strip()
content = self._extract_content(message)
if not content:
return
# Create or reuse a thread for this conversation
thread = await self._get_or_create_thread(message)
target = thread or message.channel
session_id = f"discord_{thread.id}" if thread else f"discord_{message.channel.id}"
# Derive session_id for per-conversation history via Agno's SQLite
if thread:
session_id = f"discord_{thread.id}"
else:
session_id = f"discord_{message.channel.id}"
run_output, response = await self._invoke_agent(content, session_id, target)
# Run Timmy agent with typing indicator and timeout
if run_output is not None:
await self._handle_paused_run(run_output, target, session_id)
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
await self._send_response(response, target)
def _extract_content(self, message) -> str:
"""Strip the bot mention and return clean message text."""
content = message.content
if self._client.user:
content = content.replace(f"<@{self._client.user.id}>", "").strip()
return content
async def _invoke_agent(self, content: str, session_id: str, target):
"""Run chat_with_tools with a typing indicator and timeout.
Returns a (run_output, error_response) tuple. On success the
error_response is ``None``; on failure run_output is ``None``.
"""
run_output = None
response = None
try:
@@ -548,51 +559,57 @@ class DiscordVendor(ChatPlatform):
except Exception as exc:
logger.error("Discord: chat_with_tools() failed: %s", exc)
response = "I'm having trouble reaching my inference backend right now. Please try again shortly."
return run_output, response
# Check if Agno paused the run for tool confirmation
if run_output is not None:
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
async def _handle_paused_run(self, run_output, target, session_id: str) -> None:
"""If Agno paused the run for tool confirmation, enqueue approvals."""
status = getattr(run_output, "status", None)
is_paused = status == "PAUSED" or str(status) == "RunStatus.paused"
if is_paused and getattr(run_output, "active_requirements", None):
from config import settings
if not (is_paused and getattr(run_output, "active_requirements", None)):
return
if settings.discord_confirm_actions:
for req in run_output.active_requirements:
if getattr(req, "needs_confirmation", False):
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
from config import settings
from timmy.approvals import create_item
if not settings.discord_confirm_actions:
return
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
for req in run_output.active_requirements:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
raw_content = run_output.content if hasattr(run_output, "content") else ""
response = _clean_response(raw_content or "")
from timmy.approvals import create_item
# Discord has a 2000 character limit — send with error handling
if response and response.strip():
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
item = create_item(
title=f"Discord: {tool_name}",
description=_format_action_description(tool_name, tool_args),
proposed_action=json.dumps({"tool": tool_name, "args": tool_args}),
impact=_get_impact_level(tool_name),
)
self._pending_actions[item.id] = {
"run_output": run_output,
"requirement": req,
"tool_name": tool_name,
"tool_args": tool_args,
"target": target,
"session_id": session_id,
}
await self._send_confirmation(target, tool_name, tool_args, item.id)
@staticmethod
async def _send_response(response: str | None, target) -> None:
"""Send a response to Discord, chunked to the 2000-char limit."""
if not response or not response.strip():
return
for chunk in _chunk_message(response, 2000):
try:
await target.send(chunk)
except Exception as exc:
logger.error("Discord: failed to send message chunk: %s", exc)
break
async def _get_or_create_thread(self, message):
"""Get the active thread for a channel, or create one.

View File

@@ -0,0 +1 @@
"""Lightning Network integration for tool-usage micro-payments."""

69
src/lightning/factory.py Normal file
View File

@@ -0,0 +1,69 @@
"""Lightning backend factory.
Returns a mock or real LND backend based on ``settings.lightning_backend``.
"""
from __future__ import annotations
import hashlib
import logging
import secrets
from dataclasses import dataclass
from config import settings
logger = logging.getLogger(__name__)
@dataclass
class Invoice:
"""Minimal Lightning invoice representation."""
payment_hash: str
payment_request: str
amount_sats: int
memo: str
class MockBackend:
"""In-memory mock Lightning backend for development and testing."""
def create_invoice(self, amount_sats: int, memo: str = "") -> Invoice:
"""Create a fake invoice with a random payment hash."""
raw = secrets.token_bytes(32)
payment_hash = hashlib.sha256(raw).hexdigest()
payment_request = f"lnbc{amount_sats}mock{payment_hash[:20]}"
logger.debug("Mock invoice: %s sats — %s", amount_sats, payment_hash[:12])
return Invoice(
payment_hash=payment_hash,
payment_request=payment_request,
amount_sats=amount_sats,
memo=memo,
)
# Singleton — lazily created
_backend: MockBackend | None = None
def get_backend() -> MockBackend:
"""Return the configured Lightning backend (currently mock-only).
Raises ``ValueError`` if an unsupported backend is requested.
"""
global _backend # noqa: PLW0603
if _backend is not None:
return _backend
kind = settings.lightning_backend
if kind == "mock":
_backend = MockBackend()
elif kind == "lnd":
# LND gRPC integration is on the roadmap — for now fall back to mock.
logger.warning("LND backend not yet implemented — using mock")
_backend = MockBackend()
else:
raise ValueError(f"Unknown lightning_backend: {kind!r}")
logger.info("Lightning backend: %s", kind)
return _backend

146
src/lightning/ledger.py Normal file
View File

@@ -0,0 +1,146 @@
"""In-memory Lightning transaction ledger.
Tracks invoices, settlements, and balances per the schema in
``docs/adr/018-lightning-ledger.md``. Uses a simple in-memory list so the
dashboard can display real (ephemeral) data without requiring SQLite yet.
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum
logger = logging.getLogger(__name__)
class TxType(StrEnum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(StrEnum):
pending = "pending"
settled = "settled"
failed = "failed"
expired = "expired"
@dataclass
class LedgerEntry:
"""Single ledger row matching the ADR-018 schema."""
id: str
tx_type: TxType
status: TxStatus
payment_hash: str
amount_sats: int
memo: str
source: str
created_at: str
invoice: str = ""
preimage: str = ""
task_id: str = ""
agent_id: str = ""
settled_at: str = ""
fee_sats: int = 0
# ── In-memory store ──────────────────────────────────────────────────
_entries: list[LedgerEntry] = []
def create_invoice_entry(
payment_hash: str,
amount_sats: int,
memo: str = "",
source: str = "tool_usage",
task_id: str = "",
agent_id: str = "",
invoice: str = "",
) -> LedgerEntry:
"""Record a new incoming invoice in the ledger."""
entry = LedgerEntry(
id=uuid.uuid4().hex[:16],
tx_type=TxType.incoming,
status=TxStatus.pending,
payment_hash=payment_hash,
amount_sats=amount_sats,
memo=memo,
source=source,
task_id=task_id,
agent_id=agent_id,
invoice=invoice,
created_at=datetime.now(UTC).isoformat(),
)
_entries.append(entry)
logger.debug("Ledger entry created: %s (%s sats)", entry.id, amount_sats)
return entry
def mark_settled(payment_hash: str, preimage: str = "") -> LedgerEntry | None:
"""Mark a pending entry as settled by payment hash."""
for entry in _entries:
if entry.payment_hash == payment_hash and entry.status == TxStatus.pending:
entry.status = TxStatus.settled
entry.preimage = preimage
entry.settled_at = datetime.now(UTC).isoformat()
logger.debug("Ledger settled: %s", payment_hash[:12])
return entry
return None
def get_balance() -> dict:
"""Compute the current balance from settled and pending entries."""
incoming_total = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.incoming and e.status == TxStatus.settled
)
outgoing_total = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.outgoing and e.status == TxStatus.settled
)
fees = sum(e.fee_sats for e in _entries if e.status == TxStatus.settled)
pending_in = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.incoming and e.status == TxStatus.pending
)
pending_out = sum(
e.amount_sats
for e in _entries
if e.tx_type == TxType.outgoing and e.status == TxStatus.pending
)
net = incoming_total - outgoing_total - fees
return {
"incoming_total_sats": incoming_total,
"outgoing_total_sats": outgoing_total,
"fees_paid_sats": fees,
"net_sats": net,
"pending_incoming_sats": pending_in,
"pending_outgoing_sats": pending_out,
"available_sats": net - pending_out,
}
def get_transactions(
tx_type: str | None = None,
status: str | None = None,
limit: int = 50,
) -> list[LedgerEntry]:
"""Return ledger entries, optionally filtered."""
result = _entries
if tx_type:
result = [e for e in result if e.tx_type.value == tx_type]
if status:
result = [e for e in result if e.status.value == status]
return list(reversed(result))[:limit]
def clear() -> None:
"""Reset the ledger (for testing)."""
_entries.clear()

View File

@@ -1 +1 @@
"""Timmy — Core AI agent (Ollama/AirLLM backends, CLI, prompts)."""
"""Timmy — Core AI agent (Ollama/Grok/Claude backends, CLI, prompts)."""

View File

@@ -26,12 +26,12 @@ from timmy.prompts import get_system_prompt
from timmy.tools import create_full_toolkit
if TYPE_CHECKING:
from timmy.backends import ClaudeBackend, GrokBackend, TimmyAirLLMAgent
from timmy.backends import ClaudeBackend, GrokBackend
logger = logging.getLogger(__name__)
# Union type for callers that want to hint the return type.
TimmyAgent = Union[Agent, "TimmyAirLLMAgent", "GrokBackend", "ClaudeBackend"]
TimmyAgent = Union[Agent, "GrokBackend", "ClaudeBackend"]
# Models known to be too small for reliable tool calling.
# These hallucinate tool calls as text, invoke tools randomly,
@@ -172,47 +172,29 @@ def _warmup_model(model_name: str) -> bool:
def _resolve_backend(requested: str | None) -> str:
"""Return the backend name to use, resolving 'auto' and explicit overrides.
"""Return the backend name to use.
Priority (highest lowest):
Priority (highest -> lowest):
1. CLI flag passed directly to create_timmy()
2. TIMMY_MODEL_BACKEND env var / .env setting
3. 'ollama' (safe default no surprises)
'auto' triggers Apple Silicon detection: uses AirLLM if both
is_apple_silicon() and airllm_available() return True.
3. 'ollama' (safe default -- no surprises)
"""
if requested is not None:
return requested
configured = settings.timmy_model_backend # "ollama" | "airllm" | "grok" | "claude" | "auto"
if configured != "auto":
return configured
# "auto" path — lazy import to keep startup fast and tests clean.
from timmy.backends import airllm_available, is_apple_silicon
if is_apple_silicon() and airllm_available():
return "airllm"
return "ollama"
return settings.timmy_model_backend # "ollama" | "grok" | "claude"
def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list:
"""Build the Agno tools list (toolkit + optional MCP servers).
def _build_tools_list(use_tools: bool, skip_mcp: bool, model_name: str) -> list:
"""Assemble the tools list based on model capability and MCP flags.
Args:
use_tools: Whether the model supports tool calling.
skip_mcp: If True, omit MCP tool servers.
Returns:
List of Toolkit / MCPTools, possibly empty.
Returns a list of Toolkit / MCPTools objects, or an empty list.
"""
if not use_tools:
logger.info("Tools disabled (model too small for reliable tool calling)")
logger.info("Tools disabled for model %s (too small for reliable tool calling)", model_name)
return []
toolkit = create_full_toolkit()
tools_list: list = [toolkit]
tools_list: list = [create_full_toolkit()]
# Add MCP tool servers (lazy-connected on first arun()).
# Skipped when skip_mcp=True — MCP's stdio transport uses anyio cancel
@@ -235,15 +217,7 @@ def _build_tools_list(use_tools: bool, skip_mcp: bool) -> list:
def _build_prompt(use_tools: bool, session_id: str) -> str:
"""Build the full system prompt with optional memory context.
Args:
use_tools: Whether tools are enabled (affects prompt tier and context budget).
session_id: Session identifier for the prompt.
Returns:
Complete system prompt string.
"""
"""Build the full system prompt with optional memory context."""
base_prompt = get_system_prompt(tools_enabled=use_tools, session_id=session_id)
try:
@@ -251,8 +225,7 @@ def _build_prompt(use_tools: bool, session_id: str) -> str:
memory_context = memory_system.get_system_context()
if memory_context:
# Truncate if too long — smaller budget for small models
# since the expanded prompt (roster, guardrails) uses more tokens
# Smaller budget for small models — expanded prompt uses more tokens
max_context = 2000 if not use_tools else 8000
if len(memory_context) > max_context:
memory_context = memory_context[:max_context] + "\n... [truncated]"
@@ -268,29 +241,19 @@ def _build_prompt(use_tools: bool, session_id: str) -> str:
def _create_ollama_agent(
model_name: str,
*,
db_file: str,
model_name: str,
tools_list: list,
full_prompt: str,
use_tools: bool,
) -> Agent:
"""Construct the Agno Agent with an Ollama model.
Args:
model_name: Resolved Ollama model name.
db_file: SQLite file for conversation memory.
tools_list: Pre-built tools list (may be empty).
full_prompt: Complete system prompt.
use_tools: Whether tools are enabled.
Returns:
Configured Agno Agent.
"""
"""Construct the Agno Agent with Ollama backend and warm up the model."""
model_kwargs = {}
if settings.ollama_num_ctx > 0:
model_kwargs["options"] = {"num_ctx": settings.ollama_num_ctx}
return Agent(
agent = Agent(
name="Agent",
model=Ollama(id=model_name, host=settings.ollama_url, timeout=300, **model_kwargs),
db=SqliteDb(db_file=db_file),
@@ -302,22 +265,22 @@ def _create_ollama_agent(
tool_call_limit=settings.max_agent_steps if use_tools else None,
telemetry=settings.telemetry_enabled,
)
_warmup_model(model_name)
return agent
def create_timmy(
db_file: str = "timmy.db",
backend: str | None = None,
model_size: str | None = None,
*,
skip_mcp: bool = False,
session_id: str = "unknown",
) -> TimmyAgent:
"""Instantiate the agent — Ollama or AirLLM, same public interface.
"""Instantiate the agent — Ollama, Grok, or Claude.
Args:
db_file: SQLite file for Agno conversation memory (Ollama path only).
backend: "ollama" | "airllm" | "auto" | None (reads config/env).
model_size: AirLLM size — "8b" | "70b" | "405b" | None (reads config).
backend: "ollama" | "grok" | "claude" | None (reads config/env).
skip_mcp: If True, omit MCP tool servers (Gitea, filesystem).
Use for background tasks (thinking, QA) where MCP's
stdio cancel-scope lifecycle conflicts with asyncio
@@ -327,7 +290,6 @@ def create_timmy(
print_response(message, stream).
"""
resolved = _resolve_backend(backend)
size = model_size or "70b"
if resolved == "claude":
from timmy.backends import ClaudeBackend
@@ -339,11 +301,6 @@ def create_timmy(
return GrokBackend()
if resolved == "airllm":
from timmy.backends import TimmyAirLLMAgent
return TimmyAirLLMAgent(model_size=size)
# Default: Ollama via Agno.
model_name, is_fallback = _resolve_model_with_fallback(
requested_model=None,
@@ -361,11 +318,16 @@ def create_timmy(
logger.info("Using fallback model %s (requested was unavailable)", model_name)
use_tools = _model_supports_tools(model_name)
tools_list = _build_tools_list(use_tools, skip_mcp)
tools_list = _build_tools_list(use_tools, skip_mcp, model_name)
full_prompt = _build_prompt(use_tools, session_id)
agent = _create_ollama_agent(model_name, db_file, tools_list, full_prompt, use_tools)
_warmup_model(model_name)
return agent
return _create_ollama_agent(
db_file=db_file,
model_name=model_name,
tools_list=tools_list,
full_prompt=full_prompt,
use_tools=use_tools,
)
class TimmyWithMemory:

View File

@@ -119,75 +119,84 @@ class BaseAgent(ABC):
"""
pass
async def run(self, message: str) -> str:
"""Run the agent with a message.
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_TRANSIENT = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
Retries on transient failures (connection errors, timeouts) with
exponential backoff. GPU contention from concurrent Ollama
requests causes ReadError / ReadTimeout — these are transient
and should be retried, not raised immediately (#70).
async def run(self, message: str, *, max_retries: int = 3) -> str:
"""Run the agent with a message, retrying on transient failures.
Returns:
Agent response
GPU contention from concurrent Ollama requests causes ReadError /
ReadTimeout — these are transient and retried with exponential
backoff (#70).
"""
max_retries = 3
last_exception = None
# Transient errors that indicate Ollama contention or temporary
# unavailability — these deserve a retry with backoff.
_transient = (
httpx.ConnectError,
httpx.ReadError,
httpx.ReadTimeout,
httpx.ConnectTimeout,
ConnectionError,
TimeoutError,
)
response = await self._run_with_retries(message, max_retries)
await self._emit_response_event(message, response)
return response
async def _run_with_retries(self, message: str, max_retries: int) -> str:
"""Execute agent.run() with retry logic for transient errors."""
for attempt in range(1, max_retries + 1):
try:
result = self.agent.run(message, stream=False)
response = result.content if hasattr(result, "content") else str(result)
break # Success, exit the retry loop
except _transient as exc:
last_exception = exc
if attempt < max_retries:
# Contention backoff — longer waits because the GPU
# needs time to finish the other request.
wait = min(2**attempt, 16)
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting %ds before retry...",
attempt,
max_retries,
type(exc).__name__,
wait,
)
await asyncio.sleep(wait)
else:
logger.error(
"Ollama unreachable after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
return result.content if hasattr(result, "content") else str(result)
except self._TRANSIENT as exc:
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=True,
)
await asyncio.sleep(min(2**attempt, 16))
except Exception as exc:
last_exception = exc
if attempt < max_retries:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
else:
logger.error(
"Agent run failed after %d attempts: %s",
max_retries,
exc,
)
raise last_exception from exc
self._handle_retry_or_raise(
exc,
attempt,
max_retries,
transient=False,
)
await asyncio.sleep(min(2 ** (attempt - 1), 8))
# Unreachable — _handle_retry_or_raise raises on last attempt.
raise RuntimeError("retry loop exited unexpectedly") # pragma: no cover
# Emit completion event
@staticmethod
def _handle_retry_or_raise(
exc: Exception,
attempt: int,
max_retries: int,
*,
transient: bool,
) -> None:
"""Log a retry warning or raise after exhausting attempts."""
if attempt < max_retries:
if transient:
logger.warning(
"Ollama contention on attempt %d/%d: %s. Waiting before retry...",
attempt,
max_retries,
type(exc).__name__,
)
else:
logger.warning(
"Agent run failed on attempt %d/%d: %s. Retrying...",
attempt,
max_retries,
exc,
)
else:
label = "Ollama unreachable" if transient else "Agent run failed"
logger.error("%s after %d attempts: %s", label, max_retries, exc)
raise exc
async def _emit_response_event(self, message: str, response: str) -> None:
"""Publish a completion event to the event bus if connected."""
if self.event_bus:
await self.event_bus.publish(
Event(
@@ -197,8 +206,6 @@ class BaseAgent(ABC):
)
)
return response
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools

View File

@@ -1,11 +1,10 @@
"""LLM backends — AirLLM (local big models), Grok (xAI), and Claude (Anthropic).
"""LLM backends — Grok (xAI) and Claude (Anthropic).
Provides drop-in replacements for the Agno Agent that expose the same
run(message, stream) → RunResult interface used by the dashboard and the
print_response(message, stream) interface used by the CLI.
Backends:
- TimmyAirLLMAgent: Local 8B/70B/405B via AirLLM (Apple Silicon or PyTorch)
- GrokBackend: xAI Grok API via OpenAI-compatible SDK (opt-in premium)
- ClaudeBackend: Anthropic Claude API — lightweight cloud fallback
@@ -16,21 +15,11 @@ import logging
import platform
import time
from dataclasses import dataclass
from typing import Literal
from timmy.prompts import get_system_prompt
logger = logging.getLogger(__name__)
# HuggingFace model IDs for each supported size.
_AIRLLM_MODELS: dict[str, str] = {
"8b": "meta-llama/Meta-Llama-3.1-8B-Instruct",
"70b": "meta-llama/Meta-Llama-3.1-70B-Instruct",
"405b": "meta-llama/Meta-Llama-3.1-405B-Instruct",
}
ModelSize = Literal["8b", "70b", "405b"]
@dataclass
class RunResult:
@@ -45,108 +34,6 @@ def is_apple_silicon() -> bool:
return platform.system() == "Darwin" and platform.machine() == "arm64"
def airllm_available() -> bool:
"""Return True when the airllm package is importable."""
try:
import airllm # noqa: F401
return True
except ImportError:
return False
class TimmyAirLLMAgent:
"""Thin AirLLM wrapper compatible with both dashboard and CLI call sites.
Exposes:
run(message, stream) → RunResult(content=...) [dashboard]
print_response(message, stream) → None [CLI]
Maintains a rolling 10-turn in-memory history so Timmy remembers the
conversation within a session — no SQLite needed at this layer.
"""
def __init__(self, model_size: str = "70b") -> None:
model_id = _AIRLLM_MODELS.get(model_size)
if model_id is None:
raise ValueError(
f"Unknown model size {model_size!r}. Choose from: {list(_AIRLLM_MODELS)}"
)
if is_apple_silicon():
from airllm import AirLLMMLX # type: ignore[import]
self._model = AirLLMMLX(model_id)
else:
from airllm import AutoModel # type: ignore[import]
self._model = AutoModel.from_pretrained(model_id)
self._history: list[str] = []
self._model_size = model_size
# ── public interface (mirrors Agno Agent) ────────────────────────────────
def run(self, message: str, *, stream: bool = False) -> RunResult:
"""Run inference and return a structured result (matches Agno Agent.run()).
`stream` is accepted for API compatibility; AirLLM always generates
the full output in one pass.
"""
prompt = self._build_prompt(message)
input_tokens = self._model.tokenizer(
[prompt],
return_tensors="pt",
padding=True,
truncation=True,
max_length=2048,
)
output = self._model.generate(
**input_tokens,
max_new_tokens=512,
use_cache=True,
do_sample=True,
temperature=0.7,
)
# Decode only the newly generated tokens, not the prompt.
input_len = input_tokens["input_ids"].shape[1]
response = self._model.tokenizer.decode(
output[0][input_len:], skip_special_tokens=True
).strip()
self._history.append(f"User: {message}")
self._history.append(f"Timmy: {response}")
return RunResult(content=response)
def print_response(self, message: str, *, stream: bool = True) -> None:
"""Run inference and render the response to stdout (CLI interface)."""
result = self.run(message, stream=stream)
self._render(result.content)
# ── private helpers ──────────────────────────────────────────────────────
def _build_prompt(self, message: str) -> str:
context = get_system_prompt(tools_enabled=False, session_id="airllm") + "\n\n"
# Include the last 10 turns (5 exchanges) for continuity.
if self._history:
context += "\n".join(self._history[-10:]) + "\n\n"
return context + f"User: {message}\nTimmy:"
@staticmethod
def _render(text: str) -> None:
"""Print response with rich markdown when available, plain text otherwise."""
try:
from rich.console import Console
from rich.markdown import Markdown
Console().print(Markdown(text))
except ImportError:
print(text)
# ── Grok (xAI) Backend ─────────────────────────────────────────────────────
# Premium cloud augmentation — opt-in only, never the default path.
@@ -187,7 +74,7 @@ class GrokBackend:
Uses the OpenAI-compatible SDK to connect to xAI's API.
Only activated when GROK_ENABLED=true and XAI_API_KEY is set.
Exposes the same interface as TimmyAirLLMAgent and Agno Agent:
Exposes the same interface as Agno Agent:
run(message, stream) → RunResult [dashboard]
print_response(message, stream) → None [CLI]
health_check() → dict [monitoring]
@@ -437,8 +324,7 @@ CLAUDE_MODELS: dict[str, str] = {
class ClaudeBackend:
"""Anthropic Claude backend — cloud fallback when local models are offline.
Uses the official Anthropic SDK. Same interface as GrokBackend and
TimmyAirLLMAgent:
Uses the official Anthropic SDK. Same interface as GrokBackend:
run(message, stream) → RunResult [dashboard]
print_response(message, stream) → None [CLI]
health_check() → dict [monitoring]

View File

@@ -22,13 +22,13 @@ _BACKEND_OPTION = typer.Option(
None,
"--backend",
"-b",
help="Inference backend: 'ollama' (default) | 'airllm' | 'auto'",
help="Inference backend: 'ollama' (default) | 'grok' | 'claude'",
)
_MODEL_SIZE_OPTION = typer.Option(
None,
"--model-size",
"-s",
help="AirLLM model size when --backend airllm: '8b' | '70b' | '405b'",
help="Model size (reserved for future use).",
)
@@ -37,6 +37,35 @@ def _is_interactive() -> bool:
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
def _prompt_interactive(req, tool_name: str, tool_args: dict) -> None:
"""Display tool details and prompt the human for approval."""
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
if typer.confirm("Allow this action?", default=False):
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
def _decide_autonomous(req, tool_name: str, tool_args: dict) -> None:
"""Auto-approve allowlisted tools; reject everything else."""
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info("AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100])
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
"""Prompt user to approve/reject dangerous tool calls.
@@ -51,6 +80,7 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
Returns the final RunOutput after all confirmations are resolved.
"""
interactive = _is_interactive() and not autonomous
decide = _prompt_interactive if interactive else _decide_autonomous
max_rounds = 10 # safety limit
for _ in range(max_rounds):
@@ -66,39 +96,10 @@ def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous:
for req in reqs:
if not getattr(req, "needs_confirmation", False):
continue
te = req.tool_execution
tool_name = getattr(te, "tool_name", "unknown")
tool_args = getattr(te, "tool_args", {}) or {}
if interactive:
# Human present — prompt for approval
description = format_action_description(tool_name, tool_args)
impact = get_impact_level(tool_name)
typer.echo()
typer.echo(typer.style("Tool confirmation required", bold=True))
typer.echo(f" Impact: {impact.upper()}")
typer.echo(f" {description}")
typer.echo()
approved = typer.confirm("Allow this action?", default=False)
if approved:
req.confirm()
logger.info("CLI: approved %s", tool_name)
else:
req.reject(note="User rejected from CLI")
logger.info("CLI: rejected %s", tool_name)
else:
# Autonomous mode — check allowlist
if is_allowlisted(tool_name, tool_args):
req.confirm()
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
decide(req, tool_name, tool_args)
# Resume the run so the agent sees the confirmation result
try:
@@ -138,7 +139,7 @@ def think(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Ask Timmy to think carefully about a topic."""
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
@@ -201,7 +202,7 @@ def chat(
session_id = str(uuid.uuid4())
else:
session_id = _CLI_SESSION_ID
timmy = create_timmy(backend=backend, model_size=model_size, session_id=session_id)
timmy = create_timmy(backend=backend, session_id=session_id)
# Use agent.run() so we can intercept paused runs for tool confirmation.
run_output = timmy.run(message_str, stream=False, session_id=session_id)
@@ -278,7 +279,7 @@ def status(
model_size: str | None = _MODEL_SIZE_OPTION,
):
"""Print Timmy's operational status."""
timmy = create_timmy(backend=backend, model_size=model_size, session_id=_CLI_SESSION_ID)
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(STATUS_PROMPT, stream=False, session_id=_CLI_SESSION_ID)

View File

@@ -21,6 +21,10 @@ Usage::
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from PIL import ImageDraw
import os
import shutil
import sqlite3
@@ -270,20 +274,8 @@ async def create_gitea_issue_via_mcp(title: str, body: str = "", labels: str = "
return f"Failed to create issue via MCP: {exc}"
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
# Background gradient effect — concentric circles
def _draw_background(draw: ImageDraw.ImageDraw, size: int) -> None:
"""Draw radial gradient background with concentric circles."""
for i in range(size // 2, 0, -4):
g = int(25 + (i / (size // 2)) * 30)
draw.ellipse(
@@ -291,33 +283,45 @@ def _generate_avatar_image() -> bytes:
fill=(10, g, 20),
)
# Wizard hat (triangle)
def _draw_wizard(draw: ImageDraw.ImageDraw) -> None:
"""Draw wizard hat, face, eyes, smile, monogram, and robe."""
hat_color = (100, 50, 160) # purple
draw.polygon(
[(256, 40), (160, 220), (352, 220)],
fill=hat_color,
outline=(180, 130, 255),
)
hat_outline = (180, 130, 255)
gold = (220, 190, 50)
pupil = (30, 30, 60)
# Hat brim
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=(180, 130, 255))
# Hat + brim
draw.polygon([(256, 40), (160, 220), (352, 220)], fill=hat_color, outline=hat_outline)
draw.ellipse([140, 200, 372, 250], fill=hat_color, outline=hat_outline)
# Face circle
# Face
draw.ellipse([190, 220, 322, 370], fill=(60, 180, 100), outline=(80, 220, 120))
# Eyes
# Eyes (whites + pupils)
draw.ellipse([220, 275, 248, 310], fill=(255, 255, 255))
draw.ellipse([264, 275, 292, 310], fill=(255, 255, 255))
draw.ellipse([228, 285, 242, 300], fill=(30, 30, 60))
draw.ellipse([272, 285, 286, 300], fill=(30, 30, 60))
draw.ellipse([228, 285, 242, 300], fill=pupil)
draw.ellipse([272, 285, 286, 300], fill=pupil)
# Smile
draw.arc([225, 300, 287, 355], start=10, end=170, fill=(30, 30, 60), width=3)
draw.arc([225, 300, 287, 355], start=10, end=170, fill=pupil, width=3)
# Stars around the hat
# "T" monogram on hat
draw.text((243, 100), "T", fill=gold)
# Robe
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
def _draw_stars(draw: ImageDraw.ImageDraw) -> None:
"""Draw decorative gold stars around the wizard hat."""
gold = (220, 190, 50)
star_positions = [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]
for sx, sy in star_positions:
for sx, sy in [(120, 100), (380, 120), (100, 300), (400, 280), (256, 10)]:
r = 8
draw.polygon(
[
@@ -333,18 +337,26 @@ def _generate_avatar_image() -> bytes:
fill=gold,
)
# "T" monogram on the hat
draw.text((243, 100), "T", fill=gold)
# Robe / body
draw.polygon(
[(180, 370), (140, 500), (372, 500), (332, 370)],
fill=(40, 100, 70),
outline=(60, 160, 100),
)
def _generate_avatar_image() -> bytes:
"""Generate a Timmy-themed avatar image using Pillow.
Creates a 512x512 wizard-themed avatar with emerald/purple/gold palette.
Returns raw PNG bytes. Falls back to a minimal solid-color image if
Pillow drawing primitives fail.
"""
import io
from PIL import Image, ImageDraw
size = 512
img = Image.new("RGB", (size, size), (15, 25, 20))
draw = ImageDraw.Draw(img)
_draw_background(draw, size)
_draw_wizard(draw)
_draw_stars(draw)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()

View File

@@ -78,83 +78,88 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists but new one doesn't fully)
if not has_memories:
if "memories" not in tables:
logger.info("Migration: Creating unified memories table")
# Schema will be created above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old facts table
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
# Schema will be created by _ensure_schema above
conn.commit()
return
_migrate_episodes(conn, tables)
_migrate_chunks(conn, tables)
_drop_legacy_tables(conn, tables)
conn.commit()
def _migrate_episodes(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate episodes table rows into the unified memories table."""
if "episodes" not in tables:
return
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Migrate chunks table rows into the unified memories table as vault_chunk."""
if "chunks" not in tables:
return
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_tables(conn: sqlite3.Connection, tables: set[str]) -> None:
"""Drop old facts table if it exists."""
if "facts" not in tables:
return
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
"""Get the column names for a table."""
cursor = conn.execute(f"PRAGMA table_info({table_name})")

View File

@@ -98,6 +98,73 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
return {row[1] for row in cursor.fetchall()}
def _migrate_episodes(conn: sqlite3.Connection) -> None:
"""Migrate episodes table rows into the unified memories table."""
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection) -> None:
"""Migrate chunks table rows into the unified memories table."""
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
"""Drop a legacy table if it exists."""
try:
conn.execute(f"DROP TABLE {table}") # noqa: S608
logger.info("Migration: Dropped old %s table", table)
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop %s: %s", table, exc)
def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table.
@@ -110,78 +177,16 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
# Check if we need to migrate (old schema exists)
if not has_memories and (has_episodes or has_chunks or has_facts):
if not has_memories and (tables & {"episodes", "chunks", "facts"}):
logger.info("Migration: Creating unified memories table")
# Schema will be created by _ensure_schema above
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old tables
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
if "episodes" in tables and has_memories:
_migrate_episodes(conn)
if "chunks" in tables and has_memories:
_migrate_chunks(conn)
if "facts" in tables:
_drop_legacy_table(conn, "facts")
conn.commit()
@@ -298,6 +303,85 @@ def store_memory(
return entry
def _build_search_filters(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build SQL WHERE clause and params from search filters."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_filter(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
) -> list[MemoryEntry]:
"""Score candidate rows by similarity and filter by min_relevance."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results
def search_memories(
query: str,
limit: int = 10,
@@ -320,65 +404,9 @@ def search_memories(
List of MemoryEntry objects sorted by relevance
"""
query_embedding = embed_text(query)
# Build query with filters
conditions = []
params = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
# Fetch candidates (we'll do in-memory similarity for now)
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(limit * 3) # Get more candidates for ranking
with get_connection() as conn:
rows = conn.execute(query_sql, params).fetchall()
# Compute similarity scores
results = []
for row in rows:
entry = MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
else:
# Fallback: check for keyword overlap
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
# Sort by relevance and return top results
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
where_clause, params = _build_search_filters(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
results = _score_and_filter(rows, query, query_embedding, min_relevance)
return results[:limit]

View File

@@ -323,6 +323,75 @@ def session_history(query: str, role: str = "", limit: int = 10) -> str:
_LOW_CONFIDENCE_THRESHOLD = 0.5
def _categorize_entries(
entries: list[dict],
) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
"""Split session entries into messages, errors, timmy msgs, user msgs."""
messages = [e for e in entries if e.get("type") == "message"]
errors = [e for e in entries if e.get("type") == "error"]
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
user_msgs = [e for e in messages if e.get("role") == "user"]
return messages, errors, timmy_msgs, user_msgs
def _find_low_confidence(timmy_msgs: list[dict]) -> list[dict]:
"""Return Timmy responses below the confidence threshold."""
return [
m
for m in timmy_msgs
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
]
def _find_repeated_topics(user_msgs: list[dict], top_n: int = 5) -> list[tuple[str, int]]:
"""Identify frequently mentioned words in user messages."""
topic_counts: dict[str, int] = {}
for m in user_msgs:
for word in (m.get("content") or "").lower().split():
cleaned = word.strip(".,!?\"'()[]")
if len(cleaned) > 3:
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
return sorted(
((w, c) for w, c in topic_counts.items() if c >= 3),
key=lambda x: x[1],
reverse=True,
)[:top_n]
def _format_reflection_section(
title: str,
items: list[dict],
formatter: object,
empty_msg: str,
) -> list[str]:
"""Format a titled section with items, or an empty-state message."""
if items:
lines = [f"### {title} ({len(items)})"]
for item in items[:5]:
lines.append(formatter(item)) # type: ignore[operator]
lines.append("")
return lines
return [f"### {title}\n{empty_msg}\n"]
def _build_insights(
low_conf: list[dict],
errors: list[dict],
repeated: list[tuple[str, int]],
) -> list[str]:
"""Generate actionable insight bullets from analysis results."""
insights: list[str] = []
if low_conf:
insights.append("Consider studying topics where confidence was low.")
if errors:
insights.append("Review error patterns for recurring infrastructure issues.")
if repeated:
insights.append(
f'User frequently asks about "{repeated[0][0]}" — consider deepening knowledge here.'
)
return insights or ["Conversations look healthy. Keep up the good work."]
def self_reflect(limit: int = 30) -> str:
"""Review recent conversations and reflect on Timmy's own behavior.
@@ -343,35 +412,12 @@ def self_reflect(limit: int = 30) -> str:
if not entries:
return "No conversation history to reflect on yet."
# Categorize entries
messages = [e for e in entries if e.get("type") == "message"]
errors = [e for e in entries if e.get("type") == "error"]
timmy_msgs = [e for e in messages if e.get("role") == "timmy"]
user_msgs = [e for e in messages if e.get("role") == "user"]
# 1. Low-confidence responses
low_conf = [
m
for m in timmy_msgs
if m.get("confidence") is not None and m["confidence"] < _LOW_CONFIDENCE_THRESHOLD
]
# 2. Identify repeated user topics (simple word frequency)
topic_counts: dict[str, int] = {}
for m in user_msgs:
for word in (m.get("content") or "").lower().split():
cleaned = word.strip(".,!?\"'()[]")
if len(cleaned) > 3:
topic_counts[cleaned] = topic_counts.get(cleaned, 0) + 1
repeated = sorted(
((w, c) for w, c in topic_counts.items() if c >= 3),
key=lambda x: x[1],
reverse=True,
)[:5]
_messages, errors, timmy_msgs, user_msgs = _categorize_entries(entries)
low_conf = _find_low_confidence(timmy_msgs)
repeated = _find_repeated_topics(user_msgs)
# Build reflection report
sections: list[str] = ["## Self-Reflection Report\n"]
sections.append(
f"Reviewed {len(entries)} recent entries: "
f"{len(user_msgs)} user messages, "
@@ -379,32 +425,27 @@ def self_reflect(limit: int = 30) -> str:
f"{len(errors)} errors.\n"
)
# Low confidence
if low_conf:
sections.append(f"### Low-Confidence Responses ({len(low_conf)})")
for m in low_conf[:5]:
ts = (m.get("timestamp") or "?")[:19]
conf = m.get("confidence", 0)
text = (m.get("content") or "")[:120]
sections.append(f"- [{ts}] confidence={conf:.0%}: {text}")
sections.append("")
else:
sections.append(
"### Low-Confidence Responses\nNone found — all responses above threshold.\n"
sections.extend(
_format_reflection_section(
"Low-Confidence Responses",
low_conf,
lambda m: (
f"- [{(m.get('timestamp') or '?')[:19]}] "
f"confidence={m.get('confidence', 0):.0%}: "
f"{(m.get('content') or '')[:120]}"
),
"None found — all responses above threshold.",
)
)
sections.extend(
_format_reflection_section(
"Errors",
errors,
lambda e: f"- [{(e.get('timestamp') or '?')[:19]}] {(e.get('error') or '')[:120]}",
"No errors recorded.",
)
)
# Errors
if errors:
sections.append(f"### Errors ({len(errors)})")
for e in errors[:5]:
ts = (e.get("timestamp") or "?")[:19]
err = (e.get("error") or "")[:120]
sections.append(f"- [{ts}] {err}")
sections.append("")
else:
sections.append("### Errors\nNo errors recorded.\n")
# Repeated topics
if repeated:
sections.append("### Recurring Topics")
for word, count in repeated:
@@ -413,22 +454,8 @@ def self_reflect(limit: int = 30) -> str:
else:
sections.append("### Recurring Topics\nNo strong patterns detected.\n")
# Actionable summary
insights: list[str] = []
if low_conf:
insights.append("Consider studying topics where confidence was low.")
if errors:
insights.append("Review error patterns for recurring infrastructure issues.")
if repeated:
top_topic = repeated[0][0]
insights.append(
f'User frequently asks about "{top_topic}" — consider deepening knowledge here.'
)
if not insights:
insights.append("Conversations look healthy. Keep up the good work.")
sections.append("### Insights")
for insight in insights:
for insight in _build_insights(low_conf, errors, repeated):
sections.append(f"- {insight}")
return "\n".join(sections)

View File

@@ -39,19 +39,21 @@ _DEFAULT_DB = Path("data/thoughts.db")
# qwen3 and other reasoning models wrap chain-of-thought in <think> tags
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Sensitive patterns that must never be stored as facts
_SENSITIVE_PATTERNS = [
"token",
"password",
"secret",
"api_key",
"apikey",
"credential",
".config/",
"/token",
"access_token",
"private_key",
"ssh_key",
# Sensitive patterns that must never be stored as facts.
# Uses compiled regexes with word boundaries so that compound technical
# terms like "max_tokens" or "num_tokens" are NOT falsely flagged.
_SENSITIVE_RE = [
re.compile(r"(?<![a-z_])token(?![a-z_])", re.IGNORECASE), # "token" but not "max_tokens"
re.compile(r"password", re.IGNORECASE),
re.compile(r"secret", re.IGNORECASE),
re.compile(r"api_key", re.IGNORECASE),
re.compile(r"apikey", re.IGNORECASE),
re.compile(r"credential", re.IGNORECASE),
re.compile(r"\.config/"),
re.compile(r"/token\b"),
re.compile(r"access_token", re.IGNORECASE),
re.compile(r"private_key", re.IGNORECASE),
re.compile(r"ssh_key", re.IGNORECASE),
]
# Meta-observation phrases to filter out from distilled facts
@@ -341,6 +343,11 @@ class ThinkingEngine:
)
return None
# Capture arrival time *before* the LLM call so the thought
# timestamp reflects when the cycle started, not when the
# (potentially slow) generation finished. Fixes #582.
arrived_at = datetime.now(UTC).isoformat()
memory_context, system_context, recent_thoughts = self._build_thinking_context()
content, seed_type = await self._generate_novel_thought(
@@ -352,7 +359,7 @@ class ThinkingEngine:
if not content:
return None
thought = self._store_thought(content, seed_type)
thought = self._store_thought(content, seed_type, arrived_at=arrived_at)
self._last_thought_id = thought.id
await self._process_thinking_result(thought)
@@ -543,7 +550,7 @@ class ThinkingEngine:
fact_lower = fact.lower()
# Block sensitive information
if any(pat in fact_lower for pat in _SENSITIVE_PATTERNS):
if any(pat.search(fact) for pat in _SENSITIVE_RE):
logger.warning("Distill: blocked sensitive fact: %s", fact[:60])
continue
@@ -772,23 +779,10 @@ class ThinkingEngine:
except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc)
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
# ── System snapshot helpers ────────────────────────────────────────────
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
parts: list[str] = []
# Current local time
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts.append(
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
)
# Thought count today (cheap DB query)
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
@@ -796,66 +790,94 @@ class ThinkingEngine:
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
parts.append(f"Thoughts today: {count}")
return f"Thoughts today: {count}"
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
pass
return None
# Recent chat activity (in-memory, no I/O)
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
messages = message_log.all()
if messages:
parts.append(f"Chat messages this session: {len(messages)}")
last = messages[-1]
parts.append(f'Last chat ({last.role}): "{last.content[:80]}"')
else:
parts.append("No chat messages this session")
return [
f"Chat messages this session: {len(messages)}",
f'Last chat ({last.role}): "{last.content[:80]}"',
]
return ["No chat messages this session"]
except Exception as exc:
logger.debug("Chat activity query failed: %s", exc)
pass
return []
# Task queue (lightweight DB query)
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
summary = get_task_summary_for_briefing()
running = summary.get("running", 0)
pending = summary.get("pending_approval", 0)
done = summary.get("completed", 0)
failed = summary.get("failed", 0)
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
parts.append(
return (
f"Tasks: {running} running, {pending} pending, "
f"{done} completed, {failed} failed"
)
except Exception as exc:
logger.debug("Task queue query failed: %s", exc)
pass
return None
# Workspace updates (file-based communication with Hermes)
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
# Count entries (assuming each entry starts with a timestamp or header)
line_count = len([line for line in new_corr.splitlines() if line.strip()])
parts.append(
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
files_str += f", ... (+{len(new_inbox) - 5} more)"
parts.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
lines.append(f"Workspace: {len(new_inbox)} new inbox files: {files_str}")
return lines
except Exception as exc:
logger.debug("Workspace check failed: %s", exc)
pass
return []
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
Returns a short multi-line string with current time, thought count,
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
]
thought_line = self._snap_thought_count(now)
if thought_line:
parts.append(thought_line)
parts.extend(self._snap_chat_activity())
task_line = self._snap_task_queue()
if task_line:
parts.append(task_line)
parts.extend(self._snap_workspace())
return "\n".join(parts) if parts else ""
@@ -1124,32 +1146,59 @@ class ThinkingEngine:
lines.append(f"- [{thought.seed_type}] {snippet}")
return "\n".join(lines)
_thinking_agent = None # cached agent — avoids per-call resource leaks (#525)
async def _call_agent(self, prompt: str) -> str:
"""Call Timmy's agent to generate a thought.
Creates a lightweight agent with skip_mcp=True to avoid the cancel-scope
Reuses a cached agent with skip_mcp=True to avoid the cancel-scope
errors that occur when MCP stdio transports are spawned inside asyncio
background tasks (#72). The thinking engine doesn't need Gitea or
filesystem tools — it only needs the LLM.
background tasks (#72) and to prevent per-call resource leaks (httpx
clients, SQLite connections, model warmups) that caused the thinking
loop to die every ~10 min (#525).
Individual calls are capped at 120 s so a hung Ollama never blocks
the scheduler indefinitely.
Strips ``<think>`` tags from reasoning models (qwen3, etc.) so that
downstream parsers (fact distillation, issue filing) receive clean text.
"""
from timmy.agent import create_timmy
import asyncio
if self._thinking_agent is None:
from timmy.agent import create_timmy
self._thinking_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(120):
run = await self._thinking_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Thinking LLM call timed out after 120 s")
return ""
agent = create_timmy(skip_mcp=True)
run = await agent.arun(prompt, stream=False)
raw = run.content if hasattr(run, "content") else str(run)
return _THINK_TAG_RE.sub("", raw) if raw else raw
def _store_thought(self, content: str, seed_type: str) -> Thought:
"""Persist a thought to SQLite."""
def _store_thought(
self,
content: str,
seed_type: str,
*,
arrived_at: str | None = None,
) -> Thought:
"""Persist a thought to SQLite.
Args:
arrived_at: ISO-8601 timestamp captured when the thinking cycle
started. Falls back to now() for callers that don't supply it.
"""
thought = Thought(
id=str(uuid.uuid4()),
content=content,
seed_type=seed_type,
parent_id=self._last_thought_id,
created_at=datetime.now(UTC).isoformat(),
created_at=arrived_at or datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
@@ -1230,6 +1279,53 @@ class ThinkingEngine:
logger.debug("Failed to broadcast thought: %s", exc)
def _query_thoughts(
db_path: Path, query: str, seed_type: str | None, limit: int
) -> list[sqlite3.Row]:
"""Run the thought-search SQL and return matching rows."""
pattern = f"%{query}%"
with _get_conn(db_path) as conn:
if seed_type:
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, seed_type, limit),
).fetchall()
return conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(pattern, limit),
).fetchall()
def _format_thought_rows(rows: list[sqlite3.Row], query: str, seed_type: str | None) -> str:
"""Format thought rows into a human-readable string."""
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -> str:
"""Search Timmy's thought history for reflections matching a query.
@@ -1247,58 +1343,17 @@ def search_thoughts(query: str, seed_type: str | None = None, limit: int = 10) -
Formatted string with matching thoughts, newest first, including
timestamps and seed types. Returns a helpful message if no matches found.
"""
# Clamp limit to reasonable bounds
limit = max(1, min(limit, 50))
try:
engine = thinking_engine
db_path = engine._db_path
# Build query with optional seed_type filter
with _get_conn(db_path) as conn:
if seed_type:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ? AND seed_type = ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", seed_type, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, content, seed_type, created_at
FROM thoughts
WHERE content LIKE ?
ORDER BY created_at DESC
LIMIT ?
""",
(f"%{query}%", limit),
).fetchall()
rows = _query_thoughts(thinking_engine._db_path, query, seed_type, limit)
if not rows:
if seed_type:
return f'No thoughts found matching "{query}" with seed_type="{seed_type}".'
return f'No thoughts found matching "{query}".'
# Format results
lines = [f'Found {len(rows)} thought(s) matching "{query}":']
if seed_type:
lines[0] += f' [seed_type="{seed_type}"]'
lines.append("")
for row in rows:
ts = datetime.fromisoformat(row["created_at"])
local_ts = ts.astimezone()
time_str = local_ts.strftime("%Y-%m-%d %I:%M %p").lstrip("0")
seed = row["seed_type"]
content = row["content"].replace("\n", " ") # Flatten newlines for display
lines.append(f"[{time_str}] ({seed}) {content[:150]}")
return "\n".join(lines)
return _format_thought_rows(rows, query, seed_type)
except Exception as exc:
logger.warning("Thought search failed: %s", exc)

View File

@@ -909,82 +909,35 @@ def _experiment_tool_catalog() -> dict:
}
_CREATIVE_CATALOG_SOURCES: list[tuple[str, str, list[str]]] = [
("creative.tools.git_tools", "GIT_TOOL_CATALOG", ["forge", "helm", "orchestrator"]),
("creative.tools.image_tools", "IMAGE_TOOL_CATALOG", ["pixel", "orchestrator"]),
("creative.tools.music_tools", "MUSIC_TOOL_CATALOG", ["lyra", "orchestrator"]),
("creative.tools.video_tools", "VIDEO_TOOL_CATALOG", ["reel", "orchestrator"]),
("creative.director", "DIRECTOR_TOOL_CATALOG", ["orchestrator"]),
("creative.assembler", "ASSEMBLER_TOOL_CATALOG", ["reel", "orchestrator"]),
]
def _import_creative_catalogs(catalog: dict) -> None:
"""Import and merge creative tool catalogs from creative module."""
# ── Git tools ─────────────────────────────────────────────────────────────
try:
from creative.tools.git_tools import GIT_TOOL_CATALOG
for module_path, attr_name, available_in in _CREATIVE_CATALOG_SOURCES:
_merge_catalog(catalog, module_path, attr_name, available_in)
for tool_id, info in GIT_TOOL_CATALOG.items():
def _merge_catalog(
catalog: dict, module_path: str, attr_name: str, available_in: list[str]
) -> None:
"""Import a single creative catalog and merge its entries."""
try:
from importlib import import_module
source_catalog = getattr(import_module(module_path), attr_name)
for tool_id, info in source_catalog.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["forge", "helm", "orchestrator"],
}
except ImportError:
pass
# ── Image tools ────────────────────────────────────────────────────────────
try:
from creative.tools.image_tools import IMAGE_TOOL_CATALOG
for tool_id, info in IMAGE_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["pixel", "orchestrator"],
}
except ImportError:
pass
# ── Music tools ────────────────────────────────────────────────────────────
try:
from creative.tools.music_tools import MUSIC_TOOL_CATALOG
for tool_id, info in MUSIC_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["lyra", "orchestrator"],
}
except ImportError:
pass
# ── Video tools ────────────────────────────────────────────────────────────
try:
from creative.tools.video_tools import VIDEO_TOOL_CATALOG
for tool_id, info in VIDEO_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["reel", "orchestrator"],
}
except ImportError:
pass
# ── Creative pipeline ──────────────────────────────────────────────────────
try:
from creative.director import DIRECTOR_TOOL_CATALOG
for tool_id, info in DIRECTOR_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["orchestrator"],
}
except ImportError:
pass
# ── Assembler tools ───────────────────────────────────────────────────────
try:
from creative.assembler import ASSEMBLER_TOOL_CATALOG
for tool_id, info in ASSEMBLER_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["reel", "orchestrator"],
"available_in": available_in,
}
except ImportError:
pass

View File

@@ -26,7 +26,7 @@ def get_system_info() -> dict[str, Any]:
- python_version: Python version
- platform: OS platform
- model: Current Ollama model (queried from API)
- model_backend: Configured backend (ollama/airllm/grok)
- model_backend: Configured backend (ollama/grok/claude)
- ollama_url: Ollama host URL
- repo_root: Repository root path
- grok_enabled: Whether GROK is enabled
@@ -127,54 +127,48 @@ def check_ollama_health() -> dict[str, Any]:
return result
def get_memory_status() -> dict[str, Any]:
"""Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
# Check tier 1: Hot memory
def _hot_memory_info(repo_root: Path) -> dict[str, Any]:
"""Tier 1: Hot memory (MEMORY.md) status."""
memory_md = repo_root / "MEMORY.md"
tier1_exists = memory_md.exists()
tier1_content = ""
if tier1_exists:
tier1_content = memory_md.read_text()[:500] # First 500 chars
tier1_content = memory_md.read_text()[:500]
# Check tier 2: Vault
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = []
if tier2_exists:
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()]
tier1_info: dict[str, Any] = {
info: dict[str, Any] = {
"exists": tier1_exists,
"path": str(memory_md),
"preview": " ".join(tier1_content[:200].split()) if tier1_content else None,
}
if tier1_exists:
lines = memory_md.read_text().splitlines()
tier1_info["line_count"] = len(lines)
tier1_info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
info["line_count"] = len(lines)
info["sections"] = [ln.lstrip("# ").strip() for ln in lines if ln.startswith("## ")]
return info
def _vault_info(repo_root: Path) -> dict[str, Any]:
"""Tier 2: Vault (memory/ directory tree) status."""
vault_path = repo_root / "memory" / "self"
tier2_exists = vault_path.exists()
tier2_files = [f.name for f in vault_path.iterdir() if f.is_file()] if tier2_exists else []
# Vault — scan all subdirs under memory/
vault_root = repo_root / "memory"
vault_info: dict[str, Any] = {
info: dict[str, Any] = {
"exists": tier2_exists,
"path": str(vault_path),
"file_count": len(tier2_files),
"files": tier2_files[:10],
}
if vault_root.exists():
vault_info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
vault_info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
info["directories"] = [d.name for d in vault_root.iterdir() if d.is_dir()]
info["total_markdown_files"] = sum(1 for _ in vault_root.rglob("*.md"))
return info
# Tier 3: Semantic memory row count
tier3_info: dict[str, Any] = {"available": False}
def _semantic_memory_info(repo_root: Path) -> dict[str, Any]:
"""Tier 3: Semantic memory (vector DB) status."""
info: dict[str, Any] = {"available": False}
try:
sem_db = repo_root / "data" / "memory.db"
if sem_db.exists():
@@ -184,14 +178,16 @@ def get_memory_status() -> dict[str, Any]:
).fetchone()
if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
tier3_info["available"] = True
tier3_info["vector_count"] = count[0] if count else 0
info["available"] = True
info["vector_count"] = count[0] if count else 0
except Exception as exc:
logger.debug("Memory status query failed: %s", exc)
pass
return info
# Self-coding journal stats
journal_info: dict[str, Any] = {"available": False}
def _journal_info(repo_root: Path) -> dict[str, Any]:
"""Self-coding journal statistics."""
info: dict[str, Any] = {"available": False}
try:
journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists():
@@ -203,7 +199,7 @@ def get_memory_status() -> dict[str, Any]:
if rows:
counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values())
journal_info = {
info = {
"available": True,
"total_attempts": total,
"successes": counts.get("success", 0),
@@ -212,13 +208,24 @@ def get_memory_status() -> dict[str, Any]:
}
except Exception as exc:
logger.debug("Journal stats query failed: %s", exc)
pass
return info
def get_memory_status() -> dict[str, Any]:
"""Get the status of Timmy's memory system.
Returns:
Dict with memory tier information
"""
from config import settings
repo_root = Path(settings.repo_root)
return {
"tier1_hot_memory": tier1_info,
"tier2_vault": vault_info,
"tier3_semantic": tier3_info,
"self_coding_journal": journal_info,
"tier1_hot_memory": _hot_memory_info(repo_root),
"tier2_vault": _vault_info(repo_root),
"tier3_semantic": _semantic_memory_info(repo_root),
"self_coding_journal": _journal_info(repo_root),
}
@@ -319,6 +326,46 @@ def get_live_system_status() -> dict[str, Any]:
return result
def _build_pytest_cmd(venv_python: Path, scope: str) -> list[str]:
"""Build the pytest command list for the given scope."""
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
cmd.append(scope)
return cmd
def _parse_pytest_output(output: str) -> dict[str, int]:
"""Extract passed/failed/error counts from pytest output."""
import re
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
return {"passed": passed, "failed": failed, "errors": errors}
def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[str, Any]:
"""Run Timmy's own test suite and report results.
@@ -342,49 +389,17 @@ def run_self_tests(scope: str = "fast", _repo_root: str | None = None) -> dict[s
if not venv_python.exists():
return {"success": False, "error": f"No venv found at {venv_python}"}
cmd = [str(venv_python), "-m", "pytest", "-x", "-q", "--tb=short", "--timeout=30"]
if scope == "fast":
# Unit tests only — skip functional/e2e/integration
cmd.extend(
[
"--ignore=tests/functional",
"--ignore=tests/e2e",
"--ignore=tests/integrations",
"tests/",
]
)
elif scope == "full":
cmd.append("tests/")
else:
# Specific path
cmd.append(scope)
cmd = _build_pytest_cmd(venv_python, scope)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120, cwd=repo)
output = result.stdout + result.stderr
# Parse pytest output for counts
passed = failed = errors = 0
for line in output.splitlines():
if "passed" in line or "failed" in line or "error" in line:
import re
nums = re.findall(r"(\d+) (passed|failed|error)", line)
for count, kind in nums:
if kind == "passed":
passed = int(count)
elif kind == "failed":
failed = int(count)
elif kind == "error":
errors = int(count)
counts = _parse_pytest_output(output)
return {
"success": result.returncode == 0,
"passed": passed,
"failed": failed,
"errors": errors,
"total": passed + failed + errors,
**counts,
"total": counts["passed"] + counts["failed"] + counts["errors"],
"return_code": result.returncode,
"summary": output[-2000:] if len(output) > 2000 else output,
}

View File

@@ -78,6 +78,11 @@ DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
DEFAULT_SESSION_ID = "voice"
def _rms(block: np.ndarray) -> float:
"""Compute root-mean-square energy of an audio block."""
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
@dataclass
class VoiceConfig:
"""Configuration for the voice loop."""
@@ -161,13 +166,6 @@ class VoiceLoop:
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
audio_chunks: list[np.ndarray] = []
silent_count = 0
recording = False
def _rms(block: np.ndarray) -> float:
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
@@ -177,42 +175,69 @@ class VoiceLoop:
dtype="float32",
blocksize=block_size,
) as stream:
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
chunks = self._capture_audio_blocks(stream, block_size, silence_blocks, max_blocks)
rms = _rms(block)
return self._finalize_utterance(chunks, min_blocks, sr)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
audio_chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
def _capture_audio_blocks(
self,
stream,
block_size: int,
silence_blocks: int,
max_blocks: int,
) -> list[np.ndarray]:
"""Read audio blocks from *stream* until silence or max length.
Returns the list of captured audio chunks (may be empty).
"""
chunks: list[np.ndarray] = []
silent_count = 0
recording = False
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
else:
audio_chunks.append(block.copy())
silent_count = 0
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
if silent_count >= silence_blocks:
break
# End of utterance
if silent_count >= silence_blocks:
break
if len(chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
# Safety cap
if len(audio_chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
return chunks
if not audio_chunks or len(audio_chunks) < min_blocks:
@staticmethod
def _finalize_utterance(
chunks: list[np.ndarray], min_blocks: int, sample_rate: int
) -> np.ndarray | None:
"""Concatenate recorded chunks and report duration.
Returns ``None`` if the utterance is too short to be meaningful.
"""
if not chunks or len(chunks) < min_blocks:
return None
audio = np.concatenate(audio_chunks, axis=0).flatten()
duration = len(audio) / sr
audio = np.concatenate(chunks, axis=0).flatten()
duration = len(audio) / sample_rate
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio
@@ -369,15 +394,33 @@ class VoiceLoop:
# ── Main Loop ───────────────────────────────────────────────────────
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
# Whisper hallucinates these on silence/noise — skip them.
_WHISPER_HALLUCINATIONS = frozenset(
{
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
}
)
# Suppress MCP / Agno stderr noise during voice mode.
_suppress_mcp_noise()
# Suppress MCP async-generator teardown tracebacks on exit.
_install_quiet_asyncgen_hooks()
# Spoken phrases that end the voice session.
_EXIT_COMMANDS = frozenset(
{
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
}
)
def _log_banner(self) -> None:
"""Log the startup banner with STT/TTS/LLM configuration."""
tts_label = (
"macOS say"
if self.config.use_say_fallback
@@ -393,52 +436,50 @@ class VoiceLoop:
" Press Ctrl-C to exit.\n" + "=" * 60
)
def _is_hallucination(self, text: str) -> bool:
"""Return True if *text* is a known Whisper hallucination."""
return not text or text.lower() in self._WHISPER_HALLUCINATIONS
def _is_exit_command(self, text: str) -> bool:
"""Return True if the user asked to stop the voice session."""
return text.lower().strip().rstrip(".!") in self._EXIT_COMMANDS
def _process_turn(self, text: str) -> None:
"""Handle a single listen-think-speak turn after transcription."""
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
self._speak(response)
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
_suppress_mcp_noise()
_install_quiet_asyncgen_hooks()
self._log_banner()
self._running = True
try:
while self._running:
# 1. LISTEN — record until silence
audio = self._record_utterance()
if audio is None:
continue
# 2. TRANSCRIBE — Whisper STT
text = self._transcribe(audio)
if not text or text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
):
# Whisper hallucinations on silence/noise
if self._is_hallucination(text):
logger.debug("Ignoring likely Whisper hallucination: '%s'", text)
continue
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
# Exit commands
if text.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
):
if self._is_exit_command(text):
logger.info("👋 Goodbye!")
break
# 3. THINK — send to Timmy
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
# 4. SPEAK — TTS output
self._speak(response)
self._process_turn(text)
except KeyboardInterrupt:
logger.info("👋 Voice loop stopped.")

View File

@@ -2493,3 +2493,57 @@
.db-cell { max-width: 300px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; }
.db-cell:hover { white-space: normal; word-break: break-all; }
.db-truncated { font-size: 0.7rem; color: var(--amber); padding: 0.3rem 0; }
/* ── Tower ────────────────────────────────────────────────────────────── */
.tower-container { max-width: 1400px; margin: 0 auto; }
.tower-header { margin-bottom: 1rem; }
.tower-title { font-size: 1.6rem; font-weight: 700; color: var(--green); letter-spacing: 0.15em; }
.tower-subtitle { font-size: 0.85rem; color: var(--text-dim); }
.tower-conn-badge { font-size: 0.7rem; font-weight: 600; padding: 2px 8px; border-radius: 3px; letter-spacing: 0.08em; }
.tower-conn-live { color: var(--green); border: 1px solid var(--green); }
.tower-conn-offline { color: var(--red); border: 1px solid var(--red); }
.tower-conn-connecting { color: var(--amber); border: 1px solid var(--amber); }
.tower-phase-card { min-height: 300px; }
.tower-phase-thinking { border-left: 3px solid var(--purple); }
.tower-phase-predicting { border-left: 3px solid var(--orange); }
.tower-phase-advising { border-left: 3px solid var(--green); }
.tower-scroll { max-height: 50vh; overflow-y: auto; }
.tower-empty { text-align: center; color: var(--text-dim); padding: 16px; font-size: 0.85rem; }
.tower-stat-grid { display: grid; grid-template-columns: repeat(4, 1fr); gap: 0.5rem; text-align: center; }
.tower-stat-label { display: block; font-size: 0.65rem; color: var(--text-dim); letter-spacing: 0.1em; }
.tower-stat-value { display: block; font-size: 1.1rem; font-weight: 700; color: var(--text-bright); }
.tower-event { padding: 8px; margin-bottom: 6px; border-left: 3px solid var(--border); border-radius: 3px; background: var(--bg-card); }
.tower-etype-task_posted { border-left-color: var(--purple); }
.tower-etype-bid_submitted { border-left-color: var(--orange); }
.tower-etype-task_completed { border-left-color: var(--green); }
.tower-etype-task_failed { border-left-color: var(--red); }
.tower-etype-agent_joined { border-left-color: var(--purple); }
.tower-etype-tool_executed { border-left-color: var(--amber); }
.tower-ev-head { display: flex; justify-content: space-between; align-items: center; margin-bottom: 4px; }
.tower-ev-badge { font-size: 0.65rem; font-weight: 600; color: var(--text-bright); letter-spacing: 0.08em; }
.tower-ev-dots { font-size: 0.6rem; color: var(--amber); }
.tower-ev-desc { font-size: 0.8rem; color: var(--text); }
.tower-ev-time { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
.tower-pred { padding: 8px; margin-bottom: 6px; border-radius: 3px; background: var(--bg-card); border-left: 3px solid var(--orange); }
.tower-pred-done { border-left-color: var(--green); }
.tower-pred-pending { border-left-color: var(--amber); }
.tower-pred-head { display: flex; justify-content: space-between; align-items: center; }
.tower-pred-task { font-size: 0.75rem; font-weight: 600; color: var(--text-bright); font-family: monospace; }
.tower-pred-acc { font-size: 0.75rem; font-weight: 700; }
.tower-pred-detail { font-size: 0.75rem; color: var(--text-dim); margin-top: 4px; }
.tower-advisory { padding: 8px; margin-bottom: 6px; border-radius: 3px; background: var(--bg-card); border-left: 3px solid var(--border); }
.tower-adv-high { border-left-color: var(--red); }
.tower-adv-medium { border-left-color: var(--orange); }
.tower-adv-low { border-left-color: var(--green); }
.tower-adv-head { display: flex; justify-content: space-between; font-size: 0.65rem; margin-bottom: 4px; }
.tower-adv-cat { font-weight: 600; color: var(--text-dim); letter-spacing: 0.08em; }
.tower-adv-prio { font-weight: 700; color: var(--amber); }
.tower-adv-title { font-size: 0.85rem; font-weight: 600; color: var(--text-bright); }
.tower-adv-detail { font-size: 0.8rem; color: var(--text); margin-top: 2px; }
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }

View File

@@ -18,7 +18,6 @@ except ImportError:
# agno is a core dependency (always installed) — do NOT stub it, or its
# internal import chains break under xdist parallel workers.
for _mod in [
"airllm",
"mcp",
"mcp.client",
"mcp.client.stdio",

View File

@@ -10,12 +10,10 @@ Categories:
M3xx iOS keyboard & zoom prevention
M4xx HTMX robustness (double-submit, sync)
M5xx Safe-area / notch support
M6xx AirLLM backend interface contract
"""
import re
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
# ── helpers ───────────────────────────────────────────────────────────────────
@@ -206,147 +204,3 @@ def test_M505_dvh_units_used():
"""Dynamic viewport height (dvh) accounts for collapsing browser chrome."""
css = _css()
assert "dvh" in css
# ── M6xx — AirLLM backend interface contract ──────────────────────────────────
def test_M601_airllm_agent_has_run_method():
"""TimmyAirLLMAgent must expose run() so the dashboard route can call it."""
from timmy.backends import TimmyAirLLMAgent
assert hasattr(TimmyAirLLMAgent, "run"), (
"TimmyAirLLMAgent is missing run() — dashboard will fail with AirLLM backend"
)
def test_M602_airllm_run_returns_content_attribute():
"""run() must return an object with a .content attribute (Agno RunResponse compat)."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
mock_model = MagicMock()
mock_tokenizer = MagicMock()
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 5]
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
mock_tokenizer.decode.return_value = "Sir, affirmative."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(10))]
agent._model = mock_model
result = agent.run("test")
assert hasattr(result, "content"), "run() result must have a .content attribute"
assert isinstance(result.content, str)
def test_M603_airllm_run_updates_history():
"""run() must update _history so multi-turn context is preserved."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
mock_model = MagicMock()
mock_tokenizer = MagicMock()
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 5]
mock_tokenizer.return_value = {"input_ids": input_ids_mock}
mock_tokenizer.decode.return_value = "Acknowledged."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(10))]
agent._model = mock_model
assert len(agent._history) == 0
agent.run("hello")
assert len(agent._history) == 2
assert any("hello" in h for h in agent._history)
def test_M604_airllm_print_response_delegates_to_run():
"""print_response must use run() so both interfaces share one inference path."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import RunResult, TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
with (
patch.object(agent, "run", return_value=RunResult(content="ok")) as mock_run,
patch.object(agent, "_render"),
):
agent.print_response("hello", stream=True)
mock_run.assert_called_once_with("hello", stream=True)
def test_M605_health_status_passes_model_to_template(client):
"""Health status partial must receive the configured model name, not a hardcoded string."""
from config import settings
with patch(
"dashboard.routes.health.check_ollama",
new_callable=AsyncMock,
return_value=True,
):
response = client.get("/health/status")
# Model name should come from settings, not be hardcoded
assert response.status_code == 200
model_short = settings.ollama_model.split(":")[0]
assert model_short in response.text
# ── M7xx — XSS prevention ─────────────────────────────────────────────────────
def _mobile_html() -> str:
"""Read the mobile template source."""
path = Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "mobile.html"
return path.read_text()
def _swarm_live_html() -> str:
"""Read the swarm live template source."""
path = (
Path(__file__).parent.parent.parent / "src" / "dashboard" / "templates" / "swarm_live.html"
)
return path.read_text()
def test_M701_mobile_chat_no_raw_message_interpolation():
"""mobile.html must not interpolate ${message} directly into innerHTML — XSS risk."""
html = _mobile_html()
# The vulnerable pattern is `${message}` inside a template literal assigned to innerHTML
# After the fix, message must only appear via textContent assignment
assert "textContent = message" in html or "textContent=message" in html, (
"mobile.html still uses innerHTML + ${message} interpolation — XSS vulnerability"
)
def test_M702_mobile_chat_user_input_not_in_innerhtml_template_literal():
"""${message} must not appear inside a backtick string that is assigned to innerHTML."""
html = _mobile_html()
# Find all innerHTML += `...` blocks and verify none contain ${message}
blocks = re.findall(r"innerHTML\s*\+=?\s*`([^`]*)`", html, re.DOTALL)
for block in blocks:
assert "${message}" not in block, (
"innerHTML template literal still contains ${message} — XSS vulnerability"
)
def test_M703_swarm_live_agent_name_not_interpolated_in_innerhtml():
"""swarm_live.html must not put ${agent.name} inside innerHTML template literals."""
html = _swarm_live_html()
blocks = re.findall(r"innerHTML\s*=\s*agents\.map\([^;]+\)\.join\([^)]*\)", html, re.DOTALL)
assert len(blocks) == 0, (
"swarm_live.html still uses innerHTML=agents.map(…) with interpolated agent data — XSS vulnerability"
)
def test_M704_swarm_live_uses_textcontent_for_agent_data():
"""swarm_live.html must use textContent (not innerHTML) to set agent name/description."""
html = _swarm_live_html()
assert "textContent" in html, (
"swarm_live.html does not use textContent — agent data may be raw-interpolated into DOM"
)

View File

@@ -0,0 +1,187 @@
"""Tests for Tower dashboard route (/tower)."""
from unittest.mock import MagicMock, patch
def _mock_spark_engine():
"""Return a mock spark_engine with realistic return values."""
engine = MagicMock()
engine.status.return_value = {
"enabled": True,
"events_captured": 5,
"memories_stored": 3,
"predictions": {"total": 2, "avg_accuracy": 0.85},
"event_types": {
"task_posted": 2,
"bid_submitted": 1,
"task_assigned": 1,
"task_completed": 1,
"task_failed": 0,
"agent_joined": 0,
"tool_executed": 0,
"creative_step": 0,
},
}
event = MagicMock()
event.event_type = "task_completed"
event.description = "Task finished"
event.importance = 0.8
event.created_at = "2026-01-01T00:00:00"
event.agent_id = "agent-1234-abcd"
event.task_id = "task-5678-efgh"
event.data = '{"result": "ok"}'
engine.get_timeline.return_value = [event]
pred = MagicMock()
pred.task_id = "task-5678-efgh"
pred.accuracy = 0.9
pred.evaluated_at = "2026-01-01T01:00:00"
pred.created_at = "2026-01-01T00:30:00"
pred.predicted_value = '{"outcome": "success"}'
engine.get_predictions.return_value = [pred]
advisory = MagicMock()
advisory.category = "performance"
advisory.priority = "high"
advisory.title = "Slow tasks"
advisory.detail = "Tasks taking longer than expected"
advisory.suggested_action = "Scale up workers"
engine.get_advisories.return_value = [advisory]
return engine
class TestTowerUI:
"""Tests for GET /tower endpoint."""
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_tower_returns_200(self, mock_engine, client):
response = client.get("/tower")
assert response.status_code == 200
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_tower_returns_html(self, mock_engine, client):
response = client.get("/tower")
assert "text/html" in response.headers["content-type"]
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_tower_contains_dashboard_content(self, mock_engine, client):
response = client.get("/tower")
body = response.text
assert "tower" in body.lower() or "spark" in body.lower()
class TestSparkSnapshot:
"""Tests for _spark_snapshot helper."""
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_structure(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
assert snap["type"] == "spark_state"
assert "status" in snap
assert "events" in snap
assert "predictions" in snap
assert "advisories" in snap
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_events_parsed(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
ev = snap["events"][0]
assert ev["event_type"] == "task_completed"
assert ev["importance"] == 0.8
assert ev["agent_id"] == "agent-12"
assert ev["task_id"] == "task-567"
assert ev["data"] == {"result": "ok"}
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_predictions_parsed(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
pred = snap["predictions"][0]
assert pred["task_id"] == "task-567"
assert pred["accuracy"] == 0.9
assert pred["evaluated"] is True
assert pred["predicted"] == {"outcome": "success"}
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
def test_snapshot_advisories_parsed(self, mock_engine):
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
adv = snap["advisories"][0]
assert adv["category"] == "performance"
assert adv["priority"] == "high"
assert adv["title"] == "Slow tasks"
assert adv["suggested_action"] == "Scale up workers"
@patch("dashboard.routes.tower.spark_engine")
def test_snapshot_handles_empty_state(self, mock_engine):
mock_engine.status.return_value = {"enabled": False}
mock_engine.get_timeline.return_value = []
mock_engine.get_predictions.return_value = []
mock_engine.get_advisories.return_value = []
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
assert snap["events"] == []
assert snap["predictions"] == []
assert snap["advisories"] == []
@patch("dashboard.routes.tower.spark_engine")
def test_snapshot_handles_invalid_json_data(self, mock_engine):
mock_engine.status.return_value = {"enabled": True}
event = MagicMock()
event.event_type = "test"
event.description = "bad data"
event.importance = 0.5
event.created_at = "2026-01-01T00:00:00"
event.agent_id = None
event.task_id = None
event.data = "not-json{"
mock_engine.get_timeline.return_value = [event]
pred = MagicMock()
pred.task_id = None
pred.accuracy = None
pred.evaluated_at = None
pred.created_at = "2026-01-01T00:00:00"
pred.predicted_value = None
mock_engine.get_predictions.return_value = [pred]
mock_engine.get_advisories.return_value = []
from dashboard.routes.tower import _spark_snapshot
snap = _spark_snapshot()
ev = snap["events"][0]
assert ev["data"] == {}
assert "agent_id" not in ev
assert "task_id" not in ev
pred = snap["predictions"][0]
assert pred["task_id"] == "?"
assert pred["predicted"] == {}
class TestTowerWebSocket:
"""Tests for WS /tower/ws endpoint."""
@patch("dashboard.routes.tower.spark_engine", new_callable=_mock_spark_engine)
@patch("dashboard.routes.tower._PUSH_INTERVAL", 0)
def test_ws_sends_initial_snapshot(self, mock_engine, client):
import json
with client.websocket_connect("/tower/ws") as ws:
data = json.loads(ws.receive_text())
assert data["type"] == "spark_state"
assert "status" in data
assert "events" in data

View File

@@ -5,9 +5,14 @@ from datetime import UTC, datetime, timedelta
from unittest.mock import patch
from infrastructure.error_capture import (
_create_bug_report,
_dedup_cache,
_extract_traceback_info,
_get_git_context,
_is_duplicate,
_log_error_event,
_notify_bug_report,
_record_to_session,
_stack_hash,
capture_error,
)
@@ -193,3 +198,91 @@ class TestCaptureError:
def teardown_method(self):
_dedup_cache.clear()
class TestExtractTracebackInfo:
"""Test _extract_traceback_info helper."""
def test_returns_three_tuple(self):
try:
raise ValueError("extract test")
except ValueError as e:
tb_str, affected_file, affected_line = _extract_traceback_info(e)
assert "ValueError" in tb_str
assert "extract test" in tb_str
assert affected_file.endswith(".py")
assert affected_line > 0
def test_file_points_to_raise_site(self):
try:
_make_exception()
except ValueError as e:
_, affected_file, _ = _extract_traceback_info(e)
assert "test_error_capture" in affected_file
class TestLogErrorEvent:
"""Test _log_error_event helper."""
def test_does_not_crash_on_missing_deps(self):
try:
raise RuntimeError("log test")
except RuntimeError as e:
_log_error_event(e, "test", "abc123", "file.py", 42, {"branch": "main"})
class TestCreateBugReport:
"""Test _create_bug_report helper."""
def test_does_not_crash_on_missing_deps(self):
try:
raise RuntimeError("report test")
except RuntimeError as e:
result = _create_bug_report(
e, "test", None, "abc123", "traceback...", "file.py", 42, {}
)
# May return None if swarm deps unavailable — that's fine
assert result is None or isinstance(result, str)
def test_with_context(self):
try:
raise RuntimeError("ctx test")
except RuntimeError as e:
result = _create_bug_report(e, "test", {"path": "/api"}, "abc", "tb", "f.py", 1, {})
assert result is None or isinstance(result, str)
class TestNotifyBugReport:
"""Test _notify_bug_report helper."""
def test_does_not_crash(self):
try:
raise RuntimeError("notify test")
except RuntimeError as e:
_notify_bug_report(e, "test")
class TestRecordToSession:
"""Test _record_to_session helper."""
def test_does_not_crash_without_recorder(self):
try:
raise RuntimeError("session test")
except RuntimeError as e:
_record_to_session(e, "test")
def test_calls_registered_recorder(self):
from infrastructure.error_capture import register_error_recorder
calls = []
register_error_recorder(lambda **kwargs: calls.append(kwargs))
try:
try:
raise RuntimeError("callback test")
except RuntimeError as e:
_record_to_session(e, "test_source")
assert len(calls) == 1
assert "RuntimeError" in calls[0]["error"]
assert calls[0]["context"] == "test_source"
finally:
register_error_recorder(None)

View File

@@ -0,0 +1,149 @@
"""Tests for provider health history store and API endpoint."""
import time
from datetime import UTC, datetime, timedelta
from unittest.mock import MagicMock
import pytest
from src.infrastructure.router.history import HealthHistoryStore
@pytest.fixture
def store():
"""In-memory history store for testing."""
s = HealthHistoryStore(db_path=":memory:")
yield s
s.close()
@pytest.fixture
def sample_providers():
return [
{
"name": "anthropic",
"status": "healthy",
"error_rate": 0.01,
"avg_latency_ms": 250.5,
"circuit_state": "closed",
"total_requests": 100,
},
{
"name": "local",
"status": "degraded",
"error_rate": 0.15,
"avg_latency_ms": 80.0,
"circuit_state": "closed",
"total_requests": 50,
},
]
def test_record_and_retrieve(store, sample_providers):
store.record_snapshot(sample_providers)
history = store.get_history(hours=1)
assert len(history) == 1
assert len(history[0]["providers"]) == 2
assert history[0]["providers"][0]["name"] == "anthropic"
assert history[0]["providers"][1]["name"] == "local"
assert "timestamp" in history[0]
def test_multiple_snapshots(store, sample_providers):
store.record_snapshot(sample_providers)
time.sleep(0.01)
store.record_snapshot(sample_providers)
history = store.get_history(hours=1)
assert len(history) == 2
def test_hours_filtering(store, sample_providers):
old_ts = (datetime.now(UTC) - timedelta(hours=48)).isoformat()
store._conn.execute(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10),
)
store._conn.commit()
store.record_snapshot(sample_providers)
history = store.get_history(hours=24)
assert len(history) == 1
history = store.get_history(hours=72)
assert len(history) == 2
def test_prune(store, sample_providers):
old_ts = (datetime.now(UTC) - timedelta(hours=200)).isoformat()
store._conn.execute(
"""INSERT INTO snapshots
(timestamp, provider_name, status, error_rate,
avg_latency_ms, circuit_state, total_requests)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(old_ts, "anthropic", "healthy", 0.0, 100.0, "closed", 10),
)
store._conn.commit()
store.record_snapshot(sample_providers)
deleted = store.prune(keep_hours=168)
assert deleted == 1
history = store.get_history(hours=999)
assert len(history) == 1
def test_empty_history(store):
assert store.get_history(hours=24) == []
def test_capture_snapshot_from_router(store):
mock_metrics = MagicMock()
mock_metrics.error_rate = 0.05
mock_metrics.avg_latency_ms = 200.0
mock_metrics.total_requests = 42
mock_provider = MagicMock()
mock_provider.name = "test-provider"
mock_provider.status.value = "healthy"
mock_provider.metrics = mock_metrics
mock_provider.circuit_state.value = "closed"
mock_router = MagicMock()
mock_router.providers = [mock_provider]
store._capture_snapshot(mock_router)
history = store.get_history(hours=1)
assert len(history) == 1
p = history[0]["providers"][0]
assert p["name"] == "test-provider"
assert p["status"] == "healthy"
assert p["error_rate"] == 0.05
assert p["total_requests"] == 42
def test_history_api_endpoint(store, sample_providers):
"""GET /api/v1/router/history returns snapshot data."""
store.record_snapshot(sample_providers)
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.infrastructure.router.api import get_cascade_router
from src.infrastructure.router.api import router as api_router
from src.infrastructure.router.history import get_history_store
app = FastAPI()
app.include_router(api_router)
app.dependency_overrides[get_history_store] = lambda: store
app.dependency_overrides[get_cascade_router] = lambda: MagicMock()
client = TestClient(app)
resp = client.get("/api/v1/router/history?hours=1")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
assert len(data[0]["providers"]) == 2
assert data[0]["providers"][0]["name"] == "anthropic"
app.dependency_overrides.clear()

View File

@@ -174,6 +174,103 @@ class TestDiscordVendor:
assert result is False
class TestExtractContent:
def test_strips_bot_mention(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user.id = 12345
msg = MagicMock()
msg.content = "<@12345> hello there"
assert vendor._extract_content(msg) == "hello there"
def test_no_client_user(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user = None
msg = MagicMock()
msg.content = "hello"
assert vendor._extract_content(msg) == "hello"
def test_empty_after_strip(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
vendor._client = MagicMock()
vendor._client.user.id = 99
msg = MagicMock()
msg.content = "<@99>"
assert vendor._extract_content(msg) == ""
class TestInvokeAgent:
@staticmethod
def _make_typing_target():
"""Build a mock target whose .typing() is an async context manager."""
from contextlib import asynccontextmanager
target = AsyncMock()
@asynccontextmanager
async def _typing():
yield
target.typing = _typing
return target
@pytest.mark.asyncio
async def test_timeout_returns_error(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
target = self._make_typing_target()
with patch(
"integrations.chat_bridge.vendors.discord.chat_with_tools", side_effect=TimeoutError
):
run_output, response = await vendor._invoke_agent("hi", "sess", target)
assert run_output is None
assert "too long" in response
@pytest.mark.asyncio
async def test_exception_returns_error(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
vendor = DiscordVendor()
target = self._make_typing_target()
with patch(
"integrations.chat_bridge.vendors.discord.chat_with_tools",
side_effect=RuntimeError("boom"),
):
run_output, response = await vendor._invoke_agent("hi", "sess", target)
assert run_output is None
assert "trouble" in response
class TestSendResponse:
@pytest.mark.asyncio
async def test_skips_empty(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
target = AsyncMock()
await DiscordVendor._send_response(None, target)
target.send.assert_not_called()
await DiscordVendor._send_response("", target)
target.send.assert_not_called()
@pytest.mark.asyncio
async def test_sends_short_message(self):
from integrations.chat_bridge.vendors.discord import DiscordVendor
target = AsyncMock()
await DiscordVendor._send_response("hello", target)
target.send.assert_called_once_with("hello")
class TestChunkMessage:
def test_short_message(self):
from integrations.chat_bridge.vendors.discord import _chunk_message

View File

@@ -81,7 +81,6 @@ def test_create_timmy_respects_custom_ollama_url():
mock_settings.ollama_url = custom_url
mock_settings.ollama_num_ctx = 4096
mock_settings.timmy_model_backend = "ollama"
mock_settings.airllm_model_size = "70b"
from timmy.agent import create_timmy
@@ -91,33 +90,6 @@ def test_create_timmy_respects_custom_ollama_url():
assert kwargs["host"] == custom_url
# ── AirLLM path ──────────────────────────────────────────────────────────────
def test_create_timmy_airllm_returns_airllm_agent():
"""backend='airllm' must return a TimmyAirLLMAgent, not an Agno Agent."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.agent import create_timmy
from timmy.backends import TimmyAirLLMAgent
result = create_timmy(backend="airllm", model_size="8b")
assert isinstance(result, TimmyAirLLMAgent)
def test_create_timmy_airllm_does_not_call_agno_agent():
"""When using the airllm backend, Agno Agent should never be instantiated."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.backends.is_apple_silicon", return_value=False),
):
from timmy.agent import create_timmy
create_timmy(backend="airllm", model_size="8b")
MockAgent.assert_not_called()
def test_create_timmy_explicit_ollama_ignores_autodetect():
"""backend='ollama' must always use Ollama, even on Apple Silicon."""
with (
@@ -141,7 +113,6 @@ def test_create_timmy_explicit_ollama_ignores_autodetect():
def test_resolve_backend_explicit_takes_priority():
from timmy.agent import _resolve_backend
assert _resolve_backend("airllm") == "airllm"
assert _resolve_backend("ollama") == "ollama"
@@ -152,39 +123,6 @@ def test_resolve_backend_defaults_to_ollama_without_config():
assert _resolve_backend(None) == "ollama"
def test_resolve_backend_auto_uses_airllm_on_apple_silicon():
"""'auto' on Apple Silicon with airllm stubbed → 'airllm'."""
with (
patch("timmy.backends.is_apple_silicon", return_value=True),
patch("timmy.agent.settings") as mock_settings,
):
mock_settings.timmy_model_backend = "auto"
mock_settings.airllm_model_size = "70b"
mock_settings.ollama_model = "llama3.2"
from timmy.agent import _resolve_backend
assert _resolve_backend(None) == "airllm"
def test_resolve_backend_auto_falls_back_on_non_apple():
"""'auto' on non-Apple Silicon → 'ollama'."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("timmy.agent.settings") as mock_settings,
):
mock_settings.timmy_model_backend = "auto"
mock_settings.airllm_model_size = "70b"
mock_settings.ollama_model = "llama3.2"
from timmy.agent import _resolve_backend
assert _resolve_backend(None) == "ollama"
# ── _model_supports_tools ────────────────────────────────────────────────────
def test_model_supports_tools_llama32_returns_false():
"""llama3.2 (3B) is too small for reliable tool calling."""
from timmy.agent import _model_supports_tools
@@ -259,7 +197,6 @@ def test_create_timmy_includes_tools_for_large_model():
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.ollama_num_ctx = 4096
mock_settings.timmy_model_backend = "ollama"
mock_settings.airllm_model_size = "70b"
mock_settings.telemetry_enabled = False
from timmy.agent import create_timmy
@@ -444,6 +381,150 @@ def test_get_effective_ollama_model_walks_fallback_chain():
assert result == "fb-2"
# ── _build_tools_list ─────────────────────────────────────────────────────
def test_build_tools_list_empty_when_tools_disabled():
"""Small models get an empty tools list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False, model_name="llama3.2")
assert result == []
def test_build_tools_list_includes_toolkit_when_enabled():
"""Tool-capable models get the full toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
assert mock_toolkit in result
def test_build_tools_list_skips_mcp_when_flagged():
"""skip_mcp=True must not call MCP factories."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools") as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools") as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=True, model_name="llama3.1")
mock_gitea.assert_not_called()
mock_fs.assert_not_called()
def test_build_tools_list_includes_mcp_when_not_skipped():
"""skip_mcp=False should attempt MCP tool creation."""
mock_toolkit = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=None) as mock_gitea,
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None) as mock_fs,
):
from timmy.agent import _build_tools_list
_build_tools_list(use_tools=True, skip_mcp=False, model_name="llama3.1")
mock_gitea.assert_called_once()
mock_fs.assert_called_once()
# ── _build_prompt ─────────────────────────────────────────────────────────
def test_build_prompt_includes_base_prompt():
"""Prompt should always contain the base system prompt."""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""Memory context should be appended when available."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "User prefers dark mode."
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "GROUNDED CONTEXT" in result
assert "dark mode" in result
def test_build_prompt_truncates_long_memory():
"""Long memory context should be truncated."""
mock_memory = MagicMock()
mock_memory.get_system_context.return_value = "x" * 10000
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
assert "[truncated]" in result
def test_build_prompt_survives_memory_failure():
"""Prompt should fall back to base when memory fails."""
mock_memory = MagicMock()
mock_memory.get_system_context.side_effect = RuntimeError("db locked")
with patch("timmy.memory_system.memory_system", mock_memory):
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
# Memory context should NOT be appended (the db locked error was caught)
assert "db locked" not in result
# ── _create_ollama_agent ──────────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent must pass the expected kwargs to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.1",
tools_list=[MagicMock()],
full_prompt="test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "test prompt"
assert kwargs["markdown"] is False
def test_create_ollama_agent_none_tools_when_empty():
"""Empty tools_list should pass tools=None to Agent."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
patch("timmy.agent._warmup_model", return_value=True),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
db_file="test.db",
model_name="llama3.2",
tools_list=[],
full_prompt="test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None
def test_no_hardcoded_fallback_constants_in_agent():
"""agent.py must not define module-level DEFAULT_MODEL_FALLBACKS."""
import timmy.agent as agent_mod
@@ -454,127 +535,3 @@ def test_no_hardcoded_fallback_constants_in_agent():
assert not hasattr(agent_mod, "VISION_MODEL_FALLBACKS"), (
"Hardcoded VISION_MODEL_FALLBACKS still exists — use settings.vision_fallback_models"
)
# ── _build_tools_list helper ─────────────────────────────────────────────────
def test_build_tools_list_returns_empty_when_no_tools():
"""When use_tools=False, _build_tools_list returns an empty list."""
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=False, skip_mcp=False)
assert result == []
def test_build_tools_list_includes_toolkit():
"""When use_tools=True, _build_tools_list includes the toolkit."""
mock_toolkit = MagicMock()
with patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=True)
assert mock_toolkit in result
def test_build_tools_list_adds_mcp_when_not_skipped():
"""When skip_mcp=False, _build_tools_list attempts MCP tools."""
mock_toolkit = MagicMock()
mock_gitea = MagicMock()
with (
patch("timmy.agent.create_full_toolkit", return_value=mock_toolkit),
patch("timmy.mcp_tools.create_gitea_mcp_tools", return_value=mock_gitea),
patch("timmy.mcp_tools.create_filesystem_mcp_tools", return_value=None),
):
from timmy.agent import _build_tools_list
result = _build_tools_list(use_tools=True, skip_mcp=False)
assert mock_toolkit in result
assert mock_gitea in result
# ── _build_prompt helper ─────────────────────────────────────────────────────
def test_build_prompt_returns_base_when_no_memory():
"""_build_prompt returns base prompt when memory context is empty."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = ""
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "Timmy" in result
def test_build_prompt_appends_memory_context():
"""_build_prompt appends memory context when available."""
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = "User likes pizza"
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=True, session_id="test")
assert "User likes pizza" in result
assert "GROUNDED CONTEXT" in result
def test_build_prompt_truncates_long_memory_for_small_models():
"""_build_prompt truncates memory for small models (use_tools=False)."""
long_context = "x" * 5000
with patch("timmy.memory_system.memory_system") as mock_mem:
mock_mem.get_system_context.return_value = long_context
from timmy.agent import _build_prompt
result = _build_prompt(use_tools=False, session_id="test")
# Max context is 2000 for small models + truncation marker
assert "[truncated]" in result
# ── _create_ollama_agent helper ──────────────────────────────────────────────
def test_create_ollama_agent_passes_correct_kwargs():
"""_create_ollama_agent passes the expected kwargs to Agent()."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[MagicMock()],
full_prompt="Test prompt",
use_tools=True,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["description"] == "Test prompt"
assert kwargs["tools"] is not None
def test_create_ollama_agent_none_tools_when_empty():
"""_create_ollama_agent passes tools=None when tools_list is empty."""
with (
patch("timmy.agent.Agent") as MockAgent,
patch("timmy.agent.Ollama"),
patch("timmy.agent.SqliteDb"),
):
from timmy.agent import _create_ollama_agent
_create_ollama_agent(
model_name="test-model",
db_file="test.db",
tools_list=[],
full_prompt="Test prompt",
use_tools=False,
)
kwargs = MockAgent.call_args.kwargs
assert kwargs["tools"] is None

View File

@@ -361,6 +361,53 @@ class TestRun:
assert response == "ok"
# ── _handle_retry_or_raise ────────────────────────────────────────────────
class TestHandleRetryOrRaise:
def test_raises_on_last_attempt(self):
BaseAgent = _make_base_class()
with pytest.raises(ValueError, match="boom"):
BaseAgent._handle_retry_or_raise(
ValueError("boom"),
attempt=3,
max_retries=3,
transient=False,
)
def test_raises_on_last_attempt_transient(self):
BaseAgent = _make_base_class()
exc = httpx.ConnectError("down")
with pytest.raises(httpx.ConnectError):
BaseAgent._handle_retry_or_raise(
exc,
attempt=3,
max_retries=3,
transient=True,
)
def test_no_raise_on_early_attempt(self):
BaseAgent = _make_base_class()
# Should return None (no raise) on non-final attempt
result = BaseAgent._handle_retry_or_raise(
ValueError("retry me"),
attempt=1,
max_retries=3,
transient=False,
)
assert result is None
def test_no_raise_on_early_transient(self):
BaseAgent = _make_base_class()
result = BaseAgent._handle_retry_or_raise(
httpx.ReadTimeout("busy"),
attempt=2,
max_retries=3,
transient=True,
)
assert result is None
# ── get_capabilities / get_status ────────────────────────────────────────────

View File

@@ -1,10 +1,7 @@
"""Tests for src/timmy/backends.py — AirLLM wrapper and helpers."""
"""Tests for src/timmy/backends.py — backend helpers and classes."""
import sys
from unittest.mock import MagicMock, patch
import pytest
# ── is_apple_silicon ──────────────────────────────────────────────────────────
@@ -38,183 +35,6 @@ def test_is_apple_silicon_false_on_intel_mac():
assert is_apple_silicon() is False
# ── airllm_available ─────────────────────────────────────────────────────────
def test_airllm_available_true_when_stub_in_sys_modules():
# conftest already stubs 'airllm' — importable → True.
from timmy.backends import airllm_available
assert airllm_available() is True
def test_airllm_available_false_when_not_importable():
# Temporarily remove the stub to simulate airllm not installed.
saved = sys.modules.pop("airllm", None)
try:
from timmy.backends import airllm_available
assert airllm_available() is False
finally:
if saved is not None:
sys.modules["airllm"] = saved
# ── TimmyAirLLMAgent construction ────────────────────────────────────────────
def test_airllm_agent_raises_on_unknown_size():
from timmy.backends import TimmyAirLLMAgent
with pytest.raises(ValueError, match="Unknown model size"):
TimmyAirLLMAgent(model_size="3b")
def test_airllm_agent_uses_automodel_on_non_apple():
"""Non-Apple-Silicon path uses AutoModel.from_pretrained."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="8b")
# sys.modules["airllm"] is a MagicMock; AutoModel.from_pretrained was called.
assert sys.modules["airllm"].AutoModel.from_pretrained.called
def test_airllm_agent_uses_mlx_on_apple_silicon():
"""Apple Silicon path uses AirLLMMLX, not AutoModel."""
with patch("timmy.backends.is_apple_silicon", return_value=True):
from timmy.backends import TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="8b")
assert sys.modules["airllm"].AirLLMMLX.called
def test_airllm_agent_resolves_correct_model_id_for_70b():
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import _AIRLLM_MODELS, TimmyAirLLMAgent
TimmyAirLLMAgent(model_size="70b")
sys.modules["airllm"].AutoModel.from_pretrained.assert_called_with(_AIRLLM_MODELS["70b"])
# ── TimmyAirLLMAgent.print_response ──────────────────────────────────────────
def _make_agent(model_size: str = "8b") -> "TimmyAirLLMAgent": # noqa: F821
"""Helper: create an agent with a fully mocked underlying model."""
with patch("timmy.backends.is_apple_silicon", return_value=False):
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size=model_size)
# Replace the underlying model with a clean mock that returns predictable output.
mock_model = MagicMock()
mock_tokenizer = MagicMock()
# tokenizer() returns a dict-like object with an "input_ids" tensor mock.
input_ids_mock = MagicMock()
input_ids_mock.shape = [1, 10] # shape[1] = prompt token count = 10
token_dict = {"input_ids": input_ids_mock}
mock_tokenizer.return_value = token_dict
# generate() returns a list of token sequences.
mock_tokenizer.decode.return_value = "Sir, affirmative."
mock_model.tokenizer = mock_tokenizer
mock_model.generate.return_value = [list(range(15))] # 15 tokens total
agent._model = mock_model
return agent
def test_print_response_calls_generate():
agent = _make_agent()
agent.print_response("What is sovereignty?", stream=True)
agent._model.generate.assert_called_once()
def test_print_response_decodes_only_generated_tokens():
agent = _make_agent()
agent.print_response("Hello", stream=False)
# decode should be called with tokens starting at index 10 (prompt length).
decode_call = agent._model.tokenizer.decode.call_args
token_slice = decode_call[0][0]
assert list(token_slice) == list(range(10, 15))
def test_print_response_updates_history():
agent = _make_agent()
agent.print_response("First message")
assert any("First message" in turn for turn in agent._history)
assert any("Timmy:" in turn for turn in agent._history)
def test_print_response_history_included_in_second_prompt():
agent = _make_agent()
agent.print_response("First")
# Build the prompt for the second call — history should appear.
prompt = agent._build_prompt("Second")
assert "First" in prompt
assert "Second" in prompt
def test_print_response_stream_flag_accepted():
"""stream=False should not raise — it's accepted for API compatibility."""
agent = _make_agent()
agent.print_response("hello", stream=False) # no error
# ── Prompt formatting tests ────────────────────────────────────────────────
def test_airllm_prompt_contains_formatted_model_name():
"""AirLLM prompt should have actual model name, not literal {model_name}."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "llama3.2:3b"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# Should contain the actual model name, not the placeholder
assert "{model_name}" not in prompt
assert "llama3.2:3b" in prompt
def test_airllm_prompt_gets_lite_tier():
"""AirLLM should get LITE tier prompt (tools_enabled=False)."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "test-model"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# LITE tier should NOT have TOOL USAGE section
assert "TOOL USAGE" not in prompt
# LITE tier should have the basic rules
assert "Be brief by default" in prompt
def test_airllm_prompt_contains_session_id():
"""AirLLM prompt should have session_id formatted, not placeholder."""
with (
patch("timmy.backends.is_apple_silicon", return_value=False),
patch("config.settings") as mock_settings,
):
mock_settings.ollama_model = "test-model"
from timmy.backends import TimmyAirLLMAgent
agent = TimmyAirLLMAgent(model_size="8b")
prompt = agent._build_prompt("test message")
# Should contain the session_id, not the placeholder
assert '{session_id}"' not in prompt
assert 'session "airllm"' in prompt
# ── ClaudeBackend ─────────────────────────────────────────────────────────

View File

@@ -55,14 +55,14 @@ def test_think_sends_topic_to_agent():
)
def test_think_passes_model_size_option():
"""think --model-size 70b must forward the model size to create_timmy."""
def test_think_ignores_model_size_option():
"""think --model-size 70b must not forward model_size to create_timmy."""
mock_timmy = MagicMock()
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["think", "topic", "--model-size", "70b"])
mock_create.assert_called_once_with(backend=None, model_size="70b", session_id="cli")
mock_create.assert_called_once_with(backend=None, session_id="cli")
# ---------------------------------------------------------------------------
@@ -107,19 +107,7 @@ def test_chat_new_session_uses_unique_id():
def test_chat_passes_backend_option():
"""chat --backend airllm must forward the backend to create_timmy."""
mock_run_output = MagicMock()
mock_run_output.content = "OK"
mock_run_output.status = "COMPLETED"
mock_run_output.active_requirements = []
mock_timmy = MagicMock()
mock_timmy.run.return_value = mock_run_output
with patch("timmy.cli.create_timmy", return_value=mock_timmy) as mock_create:
runner.invoke(app, ["chat", "test", "--backend", "airllm"])
mock_create.assert_called_once_with(backend="airllm", model_size=None, session_id="cli")
pass
def test_chat_cleans_response():

View File

@@ -1188,3 +1188,42 @@ def test_references_real_files_blocks_mixed(tmp_path):
# Mix of real and fake files — should fail because of the fake one
text = "Fix src/timmy/thinking.py and also src/timmy/nonexistent_module.py for the memory leak."
assert ThinkingEngine._references_real_files(text) is False
# ---------------------------------------------------------------------------
# Sensitive-pattern regression: max_tokens must NOT be flagged (#625)
# ---------------------------------------------------------------------------
def test_sensitive_patterns_allow_max_tokens():
"""_SENSITIVE_RE should not flag 'max_tokens' as sensitive (#625)."""
from timmy.thinking import _SENSITIVE_RE
safe_facts = [
"The cascade router passes max_tokens to Ollama provider.",
"max_tokens=request.max_tokens in the completion call.",
"num_tokens defaults to 2048.",
"total_prompt_tokens is tracked in stats.",
]
for fact in safe_facts:
assert not any(pat.search(fact) for pat in _SENSITIVE_RE), (
f"False positive: {fact!r} was flagged as sensitive"
)
def test_sensitive_patterns_still_block_real_secrets():
"""_SENSITIVE_RE should still block actual secrets."""
from timmy.thinking import _SENSITIVE_RE
dangerous_facts = [
"The token is abc123def456.",
"Set password to hunter2.",
"api_key = sk-live-xyz",
"Found credential in .env file.",
"access_token expired yesterday.",
"private_key stored in vault.",
]
for fact in dangerous_facts:
assert any(pat.search(fact) for pat in _SENSITIVE_RE), (
f"Missed secret: {fact!r} was NOT flagged as sensitive"
)

View File

@@ -15,7 +15,7 @@ except ImportError:
np = None
try:
from timmy.voice_loop import VoiceConfig, VoiceLoop, _strip_markdown
from timmy.voice_loop import VoiceConfig, VoiceLoop, _rms, _strip_markdown
except ImportError:
pass # pytestmark will skip all tests anyway
@@ -147,6 +147,31 @@ class TestStripMarkdown:
assert "*" not in result
class TestRms:
def test_silent_block(self):
block = np.zeros(1600, dtype=np.float32)
assert _rms(block) == pytest.approx(0.0, abs=1e-7)
def test_loud_block(self):
block = np.ones(1600, dtype=np.float32)
assert _rms(block) == pytest.approx(1.0, abs=1e-5)
class TestFinalizeUtterance:
def test_returns_none_for_empty(self):
assert VoiceLoop._finalize_utterance([], min_blocks=5, sample_rate=16000) is None
def test_returns_none_for_too_short(self):
chunks = [np.zeros(1600, dtype=np.float32) for _ in range(3)]
assert VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000) is None
def test_returns_audio_for_sufficient_chunks(self):
chunks = [np.ones(1600, dtype=np.float32) for _ in range(6)]
result = VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000)
assert result is not None
assert len(result) == 6 * 1600
class TestThink:
def test_think_returns_response(self):
loop = VoiceLoop()
@@ -236,6 +261,7 @@ class TestHallucinationFilter:
"""Whisper tends to hallucinate on silence/noise. The loop should filter these."""
def test_known_hallucinations_filtered(self):
loop = VoiceLoop()
hallucinations = [
"you",
"thanks.",
@@ -243,33 +269,35 @@ class TestHallucinationFilter:
"Bye.",
"Thanks for watching!",
"Thank you for watching!",
"",
]
for text in hallucinations:
assert text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
), f"'{text}' should be filtered"
assert loop._is_hallucination(text), f"'{text}' should be filtered"
def test_real_speech_not_filtered(self):
loop = VoiceLoop()
assert not loop._is_hallucination("Hello Timmy")
assert not loop._is_hallucination("What time is it?")
class TestExitCommands:
"""Voice loop should recognize exit commands."""
def test_exit_commands(self):
loop = VoiceLoop()
exits = ["goodbye", "exit", "quit", "stop", "goodbye timmy", "stop listening"]
for cmd in exits:
assert cmd.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
), f"'{cmd}' should be an exit command"
assert loop._is_exit_command(cmd), f"'{cmd}' should be an exit command"
def test_exit_with_punctuation(self):
loop = VoiceLoop()
assert loop._is_exit_command("goodbye!")
assert loop._is_exit_command("stop.")
def test_non_exit_commands(self):
loop = VoiceLoop()
assert not loop._is_exit_command("hello")
assert not loop._is_exit_command("what time is it")
class TestPlayAudio:

View File

@@ -0,0 +1,109 @@
"""Unit tests for the lightning package (factory + ledger)."""
from __future__ import annotations
import pytest
from lightning.factory import Invoice, MockBackend, get_backend
from lightning.ledger import (
TxStatus,
TxType,
clear,
create_invoice_entry,
get_balance,
get_transactions,
mark_settled,
)
@pytest.fixture(autouse=True)
def _clean_ledger():
"""Reset the in-memory ledger between tests."""
clear()
yield
clear()
# ── Factory tests ────────────────────────────────────────────────────
class TestMockBackend:
def test_create_invoice_returns_invoice(self):
backend = MockBackend()
inv = backend.create_invoice(100, "test memo")
assert isinstance(inv, Invoice)
assert inv.amount_sats == 100
assert inv.memo == "test memo"
assert len(inv.payment_hash) == 64 # SHA-256 hex
assert inv.payment_request.startswith("lnbc")
def test_invoices_have_unique_hashes(self):
backend = MockBackend()
a = backend.create_invoice(10)
b = backend.create_invoice(10)
assert a.payment_hash != b.payment_hash
class TestGetBackend:
def test_returns_mock_backend(self):
backend = get_backend()
assert isinstance(backend, MockBackend)
# ── Ledger tests ─────────────────────────────────────────────────────
class TestLedger:
def test_create_invoice_entry(self):
entry = create_invoice_entry(
payment_hash="abc123",
amount_sats=500,
memo="test",
source="unit_test",
)
assert entry.tx_type == TxType.incoming
assert entry.status == TxStatus.pending
assert entry.amount_sats == 500
def test_mark_settled(self):
create_invoice_entry(payment_hash="hash1", amount_sats=100)
result = mark_settled("hash1", preimage="secret")
assert result is not None
assert result.status == TxStatus.settled
assert result.preimage == "secret"
assert result.settled_at != ""
def test_mark_settled_unknown_hash(self):
assert mark_settled("nonexistent") is None
def test_get_balance_empty(self):
bal = get_balance()
assert bal["net_sats"] == 0
assert bal["available_sats"] == 0
def test_get_balance_with_settled(self):
create_invoice_entry(payment_hash="h1", amount_sats=1000)
mark_settled("h1")
bal = get_balance()
assert bal["incoming_total_sats"] == 1000
assert bal["net_sats"] == 1000
assert bal["available_sats"] == 1000
def test_get_balance_pending_not_counted(self):
create_invoice_entry(payment_hash="h2", amount_sats=500)
bal = get_balance()
assert bal["incoming_total_sats"] == 0
assert bal["pending_incoming_sats"] == 500
def test_get_transactions_returns_entries(self):
create_invoice_entry(payment_hash="t1", amount_sats=10)
create_invoice_entry(payment_hash="t2", amount_sats=20)
txs = get_transactions()
assert len(txs) == 2
def test_get_transactions_filter_by_status(self):
create_invoice_entry(payment_hash="f1", amount_sats=10)
create_invoice_entry(payment_hash="f2", amount_sats=20)
mark_settled("f1")
assert len(get_transactions(status="settled")) == 1
assert len(get_transactions(status="pending")) == 1