diff --git a/src/dashboard/routes/db_explorer.py b/src/dashboard/routes/db_explorer.py
index 7af1751b..007cfb47 100644
--- a/src/dashboard/routes/db_explorer.py
+++ b/src/dashboard/routes/db_explorer.py
@@ -3,6 +3,7 @@
import asyncio
import logging
import sqlite3
+from contextlib import closing
from pathlib import Path
from fastapi import APIRouter, Request
@@ -39,56 +40,50 @@ def _query_database(db_path: str) -> dict:
"""Open a database read-only and return all tables with their rows."""
result = {"tables": {}, "error": None}
try:
- conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
- conn.row_factory = sqlite3.Row
- except Exception as exc:
- result["error"] = str(exc)
- return result
+ with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as conn:
+ conn.row_factory = sqlite3.Row
- try:
- tables = conn.execute(
- "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
- ).fetchall()
- for (table_name,) in tables:
- try:
- rows = conn.execute(
- f"SELECT * FROM [{table_name}] LIMIT {MAX_ROWS}" # noqa: S608
- ).fetchall()
- columns = (
- [
- desc[0]
- for desc in conn.execute(
- f"SELECT * FROM [{table_name}] LIMIT 0"
- ).description
- ]
- if rows
- else []
- ) # noqa: S608
- if not columns and rows:
- columns = list(rows[0].keys())
- elif not columns:
- # Get columns even for empty tables
- cursor = conn.execute(f"PRAGMA table_info([{table_name}])") # noqa: S608
- columns = [r[1] for r in cursor.fetchall()]
- count = conn.execute(f"SELECT COUNT(*) FROM [{table_name}]").fetchone()[0] # noqa: S608
- result["tables"][table_name] = {
- "columns": columns,
- "rows": [dict(r) for r in rows],
- "total_count": count,
- "truncated": count > MAX_ROWS,
- }
- except Exception as exc:
- result["tables"][table_name] = {
- "error": str(exc),
- "columns": [],
- "rows": [],
- "total_count": 0,
- "truncated": False,
- }
+ tables = conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
+ ).fetchall()
+ for (table_name,) in tables:
+ try:
+ rows = conn.execute(
+ f"SELECT * FROM [{table_name}] LIMIT {MAX_ROWS}" # noqa: S608
+ ).fetchall()
+ columns = (
+ [
+ desc[0]
+ for desc in conn.execute(
+ f"SELECT * FROM [{table_name}] LIMIT 0"
+ ).description
+ ]
+ if rows
+ else []
+ ) # noqa: S608
+ if not columns and rows:
+ columns = list(rows[0].keys())
+ elif not columns:
+ # Get columns even for empty tables
+ cursor = conn.execute(f"PRAGMA table_info([{table_name}])") # noqa: S608
+ columns = [r[1] for r in cursor.fetchall()]
+ count = conn.execute(f"SELECT COUNT(*) FROM [{table_name}]").fetchone()[0] # noqa: S608
+ result["tables"][table_name] = {
+ "columns": columns,
+ "rows": [dict(r) for r in rows],
+ "total_count": count,
+ "truncated": count > MAX_ROWS,
+ }
+ except Exception as exc:
+ result["tables"][table_name] = {
+ "error": str(exc),
+ "columns": [],
+ "rows": [],
+ "total_count": 0,
+ "truncated": False,
+ }
except Exception as exc:
result["error"] = str(exc)
- finally:
- conn.close()
return result
diff --git a/src/dashboard/routes/health.py b/src/dashboard/routes/health.py
index b9455b56..37f4aabb 100644
--- a/src/dashboard/routes/health.py
+++ b/src/dashboard/routes/health.py
@@ -6,8 +6,11 @@ for the Mission Control dashboard.
import asyncio
import logging
+import sqlite3
import time
+from contextlib import closing
from datetime import UTC, datetime
+from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request
@@ -134,13 +137,9 @@ def _check_lightning() -> DependencyStatus:
def _check_sqlite() -> DependencyStatus:
"""Check SQLite database status."""
try:
- import sqlite3
- from pathlib import Path
-
db_path = Path(settings.repo_root) / "data" / "timmy.db"
- conn = sqlite3.connect(str(db_path))
- conn.execute("SELECT 1")
- conn.close()
+ with closing(sqlite3.connect(str(db_path))) as conn:
+ conn.execute("SELECT 1")
return DependencyStatus(
name="SQLite Database",
diff --git a/src/dashboard/routes/tasks.py b/src/dashboard/routes/tasks.py
index d7e544d6..cec14779 100644
--- a/src/dashboard/routes/tasks.py
+++ b/src/dashboard/routes/tasks.py
@@ -3,6 +3,7 @@
import logging
import sqlite3
import uuid
+from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -101,8 +102,7 @@ class _TaskView:
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(request: Request):
"""Render the main task queue page with 3-column layout."""
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
@@ -121,8 +121,6 @@ async def tasks_page(request: Request):
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
- finally:
- db.close()
return templates.TemplateResponse(
request,
@@ -145,13 +143,10 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
- finally:
- db.close()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -167,13 +162,10 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
- finally:
- db.close()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -189,13 +181,10 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
- finally:
- db.close()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -231,16 +220,13 @@ async def create_task_form(
now = datetime.utcnow().isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
- finally:
- db.close()
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
@@ -283,16 +269,13 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
- finally:
- db.close()
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -304,16 +287,13 @@ async def _set_status(request: Request, task_id: str, new_status: str):
completed_at = (
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
- finally:
- db.close()
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -339,8 +319,7 @@ async def api_create_task(request: Request):
if priority not in VALID_PRIORITIES:
priority = "normal"
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
@@ -356,8 +335,6 @@ async def api_create_task(request: Request):
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
- finally:
- db.close()
return JSONResponse(_row_to_dict(row), status_code=201)
@@ -365,11 +342,8 @@ async def api_create_task(request: Request):
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks():
"""List all tasks as JSON."""
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
- finally:
- db.close()
return JSONResponse([_row_to_dict(r) for r in rows])
@@ -384,16 +358,13 @@ async def api_update_status(task_id: str, request: Request):
completed_at = (
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
- finally:
- db.close()
if not row:
raise HTTPException(404, "Task not found")
return JSONResponse(_row_to_dict(row))
@@ -402,12 +373,9 @@ async def api_update_status(task_id: str, request: Request):
@router.delete("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_delete_task(task_id: str):
"""Delete a task."""
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
- finally:
- db.close()
if cursor.rowcount == 0:
raise HTTPException(404, "Task not found")
return JSONResponse({"success": True, "id": task_id})
@@ -421,8 +389,7 @@ async def api_delete_task(task_id: str):
@router.get("/api/queue/status", response_class=JSONResponse)
async def queue_status(assigned_to: str = "default"):
"""Return queue status for the chat panel's agent status indicator."""
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
@@ -431,8 +398,6 @@ async def queue_status(assigned_to: str = "default"):
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
- finally:
- db.close()
if running:
return JSONResponse(
diff --git a/src/dashboard/routes/work_orders.py b/src/dashboard/routes/work_orders.py
index 7fe3c5fb..c2cd2a0b 100644
--- a/src/dashboard/routes/work_orders.py
+++ b/src/dashboard/routes/work_orders.py
@@ -3,6 +3,7 @@
import logging
import sqlite3
import uuid
+from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -104,14 +105,11 @@ def _query_wos(db, statuses):
@router.get("/work-orders/queue", response_class=HTMLResponse)
async def work_orders_page(request: Request):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
pending = _query_wos(db, ["submitted", "triaged"])
active = _query_wos(db, ["approved", "in_progress"])
completed = _query_wos(db, ["completed"])
rejected = _query_wos(db, ["rejected"])
- finally:
- db.close()
return templates.TemplateResponse(
request,
@@ -148,8 +146,7 @@ async def submit_work_order(
priority = priority if priority in PRIORITIES else "medium"
category = category if category in CATEGORIES else "suggestion"
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
db.execute(
"INSERT INTO work_orders (id, title, description, priority, category, submitter, related_files, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
@@ -157,8 +154,6 @@ async def submit_work_order(
)
db.commit()
row = db.execute("SELECT * FROM work_orders WHERE id=?", (wo_id,)).fetchone()
- finally:
- db.close()
wo = _WOView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/work_order_card.html", {"wo": wo})
@@ -171,11 +166,8 @@ async def submit_work_order(
@router.get("/work-orders/queue/pending", response_class=HTMLResponse)
async def pending_partial(request: Request):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
wos = _query_wos(db, ["submitted", "triaged"])
- finally:
- db.close()
if not wos:
return HTMLResponse(
'
'
@@ -193,11 +185,8 @@ async def pending_partial(request: Request):
@router.get("/work-orders/queue/active", response_class=HTMLResponse)
async def active_partial(request: Request):
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
wos = _query_wos(db, ["approved", "in_progress"])
- finally:
- db.close()
if not wos:
return HTMLResponse(
'
'
@@ -222,8 +211,7 @@ async def _update_status(request: Request, wo_id: str, new_status: str, **extra)
completed_at = (
datetime.utcnow().isoformat() if new_status in ("completed", "rejected") else None
)
- db = _get_db()
- try:
+ with closing(_get_db()) as db:
sets = ["status=?", "completed_at=COALESCE(?, completed_at)"]
vals = [new_status, completed_at]
for col, val in extra.items():
@@ -233,8 +221,6 @@ async def _update_status(request: Request, wo_id: str, new_status: str, **extra)
db.execute(f"UPDATE work_orders SET {', '.join(sets)} WHERE id=?", vals)
db.commit()
row = db.execute("SELECT * FROM work_orders WHERE id=?", (wo_id,)).fetchone()
- finally:
- db.close()
if not row:
raise HTTPException(404, "Work order not found")
wo = _WOView(_row_to_dict(row))
diff --git a/src/infrastructure/events/bus.py b/src/infrastructure/events/bus.py
index e6957d22..fab61f0e 100644
--- a/src/infrastructure/events/bus.py
+++ b/src/infrastructure/events/bus.py
@@ -10,6 +10,7 @@ import json
import logging
import sqlite3
from collections.abc import Callable, Coroutine
+from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
@@ -99,14 +100,11 @@ class EventBus:
if self._persistence_db_path is None:
return
self._persistence_db_path.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(str(self._persistence_db_path))
- try:
+ with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.executescript(_EVENTS_SCHEMA)
conn.commit()
- finally:
- conn.close()
def _get_persistence_conn(self) -> sqlite3.Connection | None:
"""Get a connection to the persistence database."""
@@ -123,27 +121,26 @@ class EventBus:
if conn is None:
return
try:
- task_id = event.data.get("task_id", "")
- agent_id = event.data.get("agent_id", "")
- conn.execute(
- "INSERT OR IGNORE INTO events "
- "(id, event_type, source, task_id, agent_id, data, timestamp) "
- "VALUES (?, ?, ?, ?, ?, ?, ?)",
- (
- event.id,
- event.type,
- event.source,
- task_id,
- agent_id,
- json.dumps(event.data),
- event.timestamp,
- ),
- )
- conn.commit()
+ with closing(conn):
+ task_id = event.data.get("task_id", "")
+ agent_id = event.data.get("agent_id", "")
+ conn.execute(
+ "INSERT OR IGNORE INTO events "
+ "(id, event_type, source, task_id, agent_id, data, timestamp) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)",
+ (
+ event.id,
+ event.type,
+ event.source,
+ task_id,
+ agent_id,
+ json.dumps(event.data),
+ event.timestamp,
+ ),
+ )
+ conn.commit()
except Exception as exc:
logger.debug("Failed to persist event: %s", exc)
- finally:
- conn.close()
# ── Replay ───────────────────────────────────────────────────────────
@@ -170,40 +167,39 @@ class EventBus:
return []
try:
- conditions = []
- params: list = []
+ with closing(conn):
+ conditions = []
+ params: list = []
- if event_type:
- conditions.append("event_type = ?")
- params.append(event_type)
- if source:
- conditions.append("source = ?")
- params.append(source)
- if task_id:
- conditions.append("task_id = ?")
- params.append(task_id)
+ if event_type:
+ conditions.append("event_type = ?")
+ params.append(event_type)
+ if source:
+ conditions.append("source = ?")
+ params.append(source)
+ if task_id:
+ conditions.append("task_id = ?")
+ params.append(task_id)
- where = " AND ".join(conditions) if conditions else "1=1"
- sql = f"SELECT * FROM events WHERE {where} ORDER BY timestamp DESC LIMIT ?"
- params.append(limit)
+ where = " AND ".join(conditions) if conditions else "1=1"
+ sql = f"SELECT * FROM events WHERE {where} ORDER BY timestamp DESC LIMIT ?"
+ params.append(limit)
- rows = conn.execute(sql, params).fetchall()
+ rows = conn.execute(sql, params).fetchall()
- return [
- Event(
- id=row["id"],
- type=row["event_type"],
- source=row["source"],
- data=json.loads(row["data"]) if row["data"] else {},
- timestamp=row["timestamp"],
- )
- for row in rows
- ]
+ return [
+ Event(
+ id=row["id"],
+ type=row["event_type"],
+ source=row["source"],
+ data=json.loads(row["data"]) if row["data"] else {},
+ timestamp=row["timestamp"],
+ )
+ for row in rows
+ ]
except Exception as exc:
logger.debug("Failed to replay events: %s", exc)
return []
- finally:
- conn.close()
# ── Subscribe / Publish ──────────────────────────────────────────────
diff --git a/src/infrastructure/models/registry.py b/src/infrastructure/models/registry.py
index e971f150..f5b97311 100644
--- a/src/infrastructure/models/registry.py
+++ b/src/infrastructure/models/registry.py
@@ -11,6 +11,7 @@ model roles (student, teacher, judge/PRM) run on dedicated resources.
import logging
import sqlite3
import threading
+from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum
@@ -105,23 +106,22 @@ class ModelRegistry:
def _load_from_db(self) -> None:
"""Bootstrap cache from SQLite."""
try:
- conn = _get_conn()
- for row in conn.execute("SELECT * FROM custom_models WHERE active = 1").fetchall():
- self._models[row["name"]] = CustomModel(
- name=row["name"],
- format=ModelFormat(row["format"]),
- path=row["path"],
- role=ModelRole(row["role"]),
- context_window=row["context_window"],
- description=row["description"],
- registered_at=row["registered_at"],
- active=bool(row["active"]),
- default_temperature=row["default_temperature"],
- max_tokens=row["max_tokens"],
- )
- for row in conn.execute("SELECT * FROM agent_model_assignments").fetchall():
- self._agent_assignments[row["agent_id"]] = row["model_name"]
- conn.close()
+ with closing(_get_conn()) as conn:
+ for row in conn.execute("SELECT * FROM custom_models WHERE active = 1").fetchall():
+ self._models[row["name"]] = CustomModel(
+ name=row["name"],
+ format=ModelFormat(row["format"]),
+ path=row["path"],
+ role=ModelRole(row["role"]),
+ context_window=row["context_window"],
+ description=row["description"],
+ registered_at=row["registered_at"],
+ active=bool(row["active"]),
+ default_temperature=row["default_temperature"],
+ max_tokens=row["max_tokens"],
+ )
+ for row in conn.execute("SELECT * FROM agent_model_assignments").fetchall():
+ self._agent_assignments[row["agent_id"]] = row["model_name"]
except Exception as exc:
logger.warning("Failed to load model registry from DB: %s", exc)
@@ -130,29 +130,28 @@ class ModelRegistry:
def register(self, model: CustomModel) -> CustomModel:
"""Register a new custom model."""
with self._lock:
- conn = _get_conn()
- conn.execute(
- """
- INSERT OR REPLACE INTO custom_models
- (name, format, path, role, context_window, description,
- registered_at, active, default_temperature, max_tokens)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- model.name,
- model.format.value,
- model.path,
- model.role.value,
- model.context_window,
- model.description,
- model.registered_at,
- int(model.active),
- model.default_temperature,
- model.max_tokens,
- ),
- )
- conn.commit()
- conn.close()
+ with closing(_get_conn()) as conn:
+ conn.execute(
+ """
+ INSERT OR REPLACE INTO custom_models
+ (name, format, path, role, context_window, description,
+ registered_at, active, default_temperature, max_tokens)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ model.name,
+ model.format.value,
+ model.path,
+ model.role.value,
+ model.context_window,
+ model.description,
+ model.registered_at,
+ int(model.active),
+ model.default_temperature,
+ model.max_tokens,
+ ),
+ )
+ conn.commit()
self._models[model.name] = model
logger.info("Registered model: %s (%s)", model.name, model.format.value)
return model
@@ -162,11 +161,10 @@ class ModelRegistry:
with self._lock:
if name not in self._models:
return False
- conn = _get_conn()
- conn.execute("DELETE FROM custom_models WHERE name = ?", (name,))
- conn.execute("DELETE FROM agent_model_assignments WHERE model_name = ?", (name,))
- conn.commit()
- conn.close()
+ with closing(_get_conn()) as conn:
+ conn.execute("DELETE FROM custom_models WHERE name = ?", (name,))
+ conn.execute("DELETE FROM agent_model_assignments WHERE model_name = ?", (name,))
+ conn.commit()
del self._models[name]
# Remove any agent assignments using this model
self._agent_assignments = {
@@ -193,13 +191,12 @@ class ModelRegistry:
return False
with self._lock:
model.active = active
- conn = _get_conn()
- conn.execute(
- "UPDATE custom_models SET active = ? WHERE name = ?",
- (int(active), name),
- )
- conn.commit()
- conn.close()
+ with closing(_get_conn()) as conn:
+ conn.execute(
+ "UPDATE custom_models SET active = ? WHERE name = ?",
+ (int(active), name),
+ )
+ conn.commit()
return True
# ── Agent-model assignments ────────────────────────────────────────────
@@ -210,17 +207,16 @@ class ModelRegistry:
return False
with self._lock:
now = datetime.now(UTC).isoformat()
- conn = _get_conn()
- conn.execute(
- """
- INSERT OR REPLACE INTO agent_model_assignments
- (agent_id, model_name, assigned_at)
- VALUES (?, ?, ?)
- """,
- (agent_id, model_name, now),
- )
- conn.commit()
- conn.close()
+ with closing(_get_conn()) as conn:
+ conn.execute(
+ """
+ INSERT OR REPLACE INTO agent_model_assignments
+ (agent_id, model_name, assigned_at)
+ VALUES (?, ?, ?)
+ """,
+ (agent_id, model_name, now),
+ )
+ conn.commit()
self._agent_assignments[agent_id] = model_name
logger.info("Assigned model %s to agent %s", model_name, agent_id)
return True
@@ -230,13 +226,12 @@ class ModelRegistry:
with self._lock:
if agent_id not in self._agent_assignments:
return False
- conn = _get_conn()
- conn.execute(
- "DELETE FROM agent_model_assignments WHERE agent_id = ?",
- (agent_id,),
- )
- conn.commit()
- conn.close()
+ with closing(_get_conn()) as conn:
+ conn.execute(
+ "DELETE FROM agent_model_assignments WHERE agent_id = ?",
+ (agent_id,),
+ )
+ conn.commit()
del self._agent_assignments[agent_id]
return True
diff --git a/src/timmy/approvals.py b/src/timmy/approvals.py
index 5ca6bf3e..f093dac7 100644
--- a/src/timmy/approvals.py
+++ b/src/timmy/approvals.py
@@ -13,6 +13,7 @@ Default is always True. The owner changes this intentionally.
import sqlite3
import uuid
+from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
@@ -96,80 +97,73 @@ def create_item(
created_at=datetime.now(UTC),
status="pending",
)
- conn = _get_conn(db_path)
- conn.execute(
- """
- INSERT INTO approval_items
- (id, title, description, proposed_action, impact, created_at, status)
- VALUES (?, ?, ?, ?, ?, ?, ?)
- """,
- (
- item.id,
- item.title,
- item.description,
- item.proposed_action,
- item.impact,
- item.created_at.isoformat(),
- item.status,
- ),
- )
- conn.commit()
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ conn.execute(
+ """
+ INSERT INTO approval_items
+ (id, title, description, proposed_action, impact, created_at, status)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ item.id,
+ item.title,
+ item.description,
+ item.proposed_action,
+ item.impact,
+ item.created_at.isoformat(),
+ item.status,
+ ),
+ )
+ conn.commit()
return item
def list_pending(db_path: Path = _DEFAULT_DB) -> list[ApprovalItem]:
"""Return all pending approval items, newest first."""
- conn = _get_conn(db_path)
- rows = conn.execute(
- "SELECT * FROM approval_items WHERE status = 'pending' ORDER BY created_at DESC"
- ).fetchall()
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ rows = conn.execute(
+ "SELECT * FROM approval_items WHERE status = 'pending' ORDER BY created_at DESC"
+ ).fetchall()
return [_row_to_item(r) for r in rows]
def list_all(db_path: Path = _DEFAULT_DB) -> list[ApprovalItem]:
"""Return all approval items regardless of status, newest first."""
- conn = _get_conn(db_path)
- rows = conn.execute("SELECT * FROM approval_items ORDER BY created_at DESC").fetchall()
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ rows = conn.execute("SELECT * FROM approval_items ORDER BY created_at DESC").fetchall()
return [_row_to_item(r) for r in rows]
def get_item(item_id: str, db_path: Path = _DEFAULT_DB) -> ApprovalItem | None:
- conn = _get_conn(db_path)
- row = conn.execute("SELECT * FROM approval_items WHERE id = ?", (item_id,)).fetchone()
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ row = conn.execute("SELECT * FROM approval_items WHERE id = ?", (item_id,)).fetchone()
return _row_to_item(row) if row else None
def approve(item_id: str, db_path: Path = _DEFAULT_DB) -> ApprovalItem | None:
"""Mark an approval item as approved."""
- conn = _get_conn(db_path)
- conn.execute("UPDATE approval_items SET status = 'approved' WHERE id = ?", (item_id,))
- conn.commit()
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ conn.execute("UPDATE approval_items SET status = 'approved' WHERE id = ?", (item_id,))
+ conn.commit()
return get_item(item_id, db_path)
def reject(item_id: str, db_path: Path = _DEFAULT_DB) -> ApprovalItem | None:
"""Mark an approval item as rejected."""
- conn = _get_conn(db_path)
- conn.execute("UPDATE approval_items SET status = 'rejected' WHERE id = ?", (item_id,))
- conn.commit()
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ conn.execute("UPDATE approval_items SET status = 'rejected' WHERE id = ?", (item_id,))
+ conn.commit()
return get_item(item_id, db_path)
def expire_old(db_path: Path = _DEFAULT_DB) -> int:
"""Auto-expire pending items older than EXPIRY_DAYS. Returns count removed."""
cutoff = (datetime.now(UTC) - timedelta(days=_EXPIRY_DAYS)).isoformat()
- conn = _get_conn(db_path)
- cursor = conn.execute(
- "DELETE FROM approval_items WHERE status = 'pending' AND created_at < ?",
- (cutoff,),
- )
- conn.commit()
- count = cursor.rowcount
- conn.close()
+ with closing(_get_conn(db_path)) as conn:
+ cursor = conn.execute(
+ "DELETE FROM approval_items WHERE status = 'pending' AND created_at < ?",
+ (cutoff,),
+ )
+ conn.commit()
+ count = cursor.rowcount
return count
diff --git a/src/timmy/backends.py b/src/timmy/backends.py
index 4fa7b947..bae31f05 100644
--- a/src/timmy/backends.py
+++ b/src/timmy/backends.py
@@ -37,6 +37,7 @@ class RunResult:
"""Minimal Agno-compatible run result — carries the model's response text."""
content: str
+ confidence: float | None = None
def is_apple_silicon() -> bool:
diff --git a/src/timmy/briefing.py b/src/timmy/briefing.py
index a63686b3..0527f155 100644
--- a/src/timmy/briefing.py
+++ b/src/timmy/briefing.py
@@ -10,6 +10,7 @@ regenerates the briefing every 6 hours.
import logging
import sqlite3
+from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
@@ -74,28 +75,26 @@ def _get_cache_conn(db_path: Path = _DEFAULT_DB) -> sqlite3.Connection:
def _save_briefing(briefing: Briefing, db_path: Path = _DEFAULT_DB) -> None:
- conn = _get_cache_conn(db_path)
- conn.execute(
- """
- INSERT INTO briefings (generated_at, period_start, period_end, summary)
- VALUES (?, ?, ?, ?)
- """,
- (
- briefing.generated_at.isoformat(),
- briefing.period_start.isoformat(),
- briefing.period_end.isoformat(),
- briefing.summary,
- ),
- )
- conn.commit()
- conn.close()
+ with closing(_get_cache_conn(db_path)) as conn:
+ conn.execute(
+ """
+ INSERT INTO briefings (generated_at, period_start, period_end, summary)
+ VALUES (?, ?, ?, ?)
+ """,
+ (
+ briefing.generated_at.isoformat(),
+ briefing.period_start.isoformat(),
+ briefing.period_end.isoformat(),
+ briefing.summary,
+ ),
+ )
+ conn.commit()
def _load_latest(db_path: Path = _DEFAULT_DB) -> Briefing | None:
"""Load the most-recently cached briefing, or None if there is none."""
- conn = _get_cache_conn(db_path)
- row = conn.execute("SELECT * FROM briefings ORDER BY generated_at DESC LIMIT 1").fetchone()
- conn.close()
+ with closing(_get_cache_conn(db_path)) as conn:
+ row = conn.execute("SELECT * FROM briefings ORDER BY generated_at DESC LIMIT 1").fetchone()
if row is None:
return None
return Briefing(
@@ -129,27 +128,25 @@ def _gather_swarm_summary(since: datetime) -> str:
return "No swarm activity recorded yet."
try:
- conn = sqlite3.connect(str(swarm_db))
- conn.row_factory = sqlite3.Row
+ with closing(sqlite3.connect(str(swarm_db))) as conn:
+ conn.row_factory = sqlite3.Row
- since_iso = since.isoformat()
+ since_iso = since.isoformat()
- completed = conn.execute(
- "SELECT COUNT(*) as c FROM tasks WHERE status = 'completed' AND created_at > ?",
- (since_iso,),
- ).fetchone()["c"]
+ completed = conn.execute(
+ "SELECT COUNT(*) as c FROM tasks WHERE status = 'completed' AND created_at > ?",
+ (since_iso,),
+ ).fetchone()["c"]
- failed = conn.execute(
- "SELECT COUNT(*) as c FROM tasks WHERE status = 'failed' AND created_at > ?",
- (since_iso,),
- ).fetchone()["c"]
+ failed = conn.execute(
+ "SELECT COUNT(*) as c FROM tasks WHERE status = 'failed' AND created_at > ?",
+ (since_iso,),
+ ).fetchone()["c"]
- agents = conn.execute(
- "SELECT COUNT(*) as c FROM agents WHERE registered_at > ?",
- (since_iso,),
- ).fetchone()["c"]
-
- conn.close()
+ agents = conn.execute(
+ "SELECT COUNT(*) as c FROM agents WHERE registered_at > ?",
+ (since_iso,),
+ ).fetchone()["c"]
parts = []
if completed:
diff --git a/src/timmy/confidence.py b/src/timmy/confidence.py
new file mode 100644
index 00000000..d23e23a0
--- /dev/null
+++ b/src/timmy/confidence.py
@@ -0,0 +1,128 @@
+"""Confidence estimation for Timmy's responses.
+
+Implements SOUL.md requirement: "When I am uncertain, I must say so in
+proportion to my uncertainty."
+
+This module provides heuristics to estimate confidence based on linguistic
+signals in the response text. It measures uncertainty without modifying
+the response content.
+"""
+
+import re
+
+# Hedging words that indicate uncertainty
+HEDGING_WORDS = [
+ "i think",
+ "maybe",
+ "perhaps",
+ "not sure",
+ "might",
+ "could be",
+ "possibly",
+ "i believe",
+ "approximately",
+ "roughly",
+ "probably",
+ "likely",
+ "seems",
+ "appears",
+ "suggests",
+ "i guess",
+ "i suppose",
+ "sort of",
+ "kind of",
+ "somewhat",
+ "fairly",
+ "relatively",
+ "i'm not certain",
+ "i am not certain",
+ "uncertain",
+ "unclear",
+]
+
+# Certainty words that indicate confidence
+CERTAINTY_WORDS = [
+ "i know",
+ "definitely",
+ "certainly",
+ "the answer is",
+ "specifically",
+ "exactly",
+ "absolutely",
+ "without doubt",
+ "i am certain",
+ "i'm certain",
+ "it is true that",
+ "fact is",
+ "in fact",
+ "indeed",
+ "undoubtedly",
+ "clearly",
+ "obviously",
+ "conclusively",
+]
+
+# Very low confidence indicators (direct admissions of ignorance)
+LOW_CONFIDENCE_PATTERNS = [
+ r"i\s+(?:don't|do not)\s+know",
+ r"i\s+(?:am|I'm|i'm)\s+(?:not\s+sure|unsure)",
+ r"i\s+have\s+no\s+(?:idea|clue)",
+ r"i\s+cannot\s+(?:say|tell|answer)",
+ r"i\s+can't\s+(?:say|tell|answer)",
+]
+
+
+def estimate_confidence(text: str) -> float:
+ """Estimate confidence level of a response based on linguistic signals.
+
+ Analyzes the text for hedging words (reducing confidence) and certainty
+ words (increasing confidence). Returns a score between 0.0 and 1.0.
+
+ Args:
+ text: The response text to analyze.
+
+ Returns:
+ A float between 0.0 (very uncertain) and 1.0 (very confident).
+ """
+ if not text or not text.strip():
+ return 0.0
+
+ text_lower = text.lower().strip()
+ confidence = 0.5 # Start with neutral confidence
+
+ # Check for direct admissions of ignorance (very low confidence)
+ for pattern in LOW_CONFIDENCE_PATTERNS:
+ if re.search(pattern, text_lower):
+ # Direct admission of not knowing - very low confidence
+ confidence = 0.15
+ break
+
+ # Count hedging words (reduce confidence)
+ hedging_count = 0
+ for hedge in HEDGING_WORDS:
+ if hedge in text_lower:
+ hedging_count += 1
+
+ # Count certainty words (increase confidence)
+ certainty_count = 0
+ for certain in CERTAINTY_WORDS:
+ if certain in text_lower:
+ certainty_count += 1
+
+ # Adjust confidence based on word counts
+ # Each hedging word reduces confidence by 0.1
+ # Each certainty word increases confidence by 0.1
+ confidence -= hedging_count * 0.1
+ confidence += certainty_count * 0.1
+
+ # Short factual answers get a small boost
+ word_count = len(text.split())
+ if word_count <= 5 and confidence > 0.3:
+ confidence += 0.1
+
+ # Questions in response indicate uncertainty
+ if "?" in text:
+ confidence -= 0.15
+
+ # Clamp to valid range
+ return max(0.0, min(1.0, confidence))
diff --git a/src/timmy/mcp_tools.py b/src/timmy/mcp_tools.py
index 128c9b4d..a9522ba2 100644
--- a/src/timmy/mcp_tools.py
+++ b/src/timmy/mcp_tools.py
@@ -25,6 +25,7 @@ import os
import shutil
import sqlite3
import uuid
+from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -163,37 +164,36 @@ def _bridge_to_work_order(title: str, body: str, category: str) -> None:
try:
db_path = Path(settings.repo_root) / "data" / "work_orders.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(str(db_path))
- conn.execute(
- """CREATE TABLE IF NOT EXISTS work_orders (
- id TEXT PRIMARY KEY,
- title TEXT NOT NULL,
- description TEXT DEFAULT '',
- priority TEXT DEFAULT 'medium',
- category TEXT DEFAULT 'suggestion',
- submitter TEXT DEFAULT 'dashboard',
- related_files TEXT DEFAULT '',
- status TEXT DEFAULT 'submitted',
- result TEXT DEFAULT '',
- rejection_reason TEXT DEFAULT '',
- created_at TEXT DEFAULT (datetime('now')),
- completed_at TEXT
- )"""
- )
- conn.execute(
- "INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
- "VALUES (?, ?, ?, ?, ?, ?)",
- (
- str(uuid.uuid4()),
- title,
- body,
- category,
- "timmy-thinking",
- datetime.utcnow().isoformat(),
- ),
- )
- conn.commit()
- conn.close()
+ with closing(sqlite3.connect(str(db_path))) as conn:
+ conn.execute(
+ """CREATE TABLE IF NOT EXISTS work_orders (
+ id TEXT PRIMARY KEY,
+ title TEXT NOT NULL,
+ description TEXT DEFAULT '',
+ priority TEXT DEFAULT 'medium',
+ category TEXT DEFAULT 'suggestion',
+ submitter TEXT DEFAULT 'dashboard',
+ related_files TEXT DEFAULT '',
+ status TEXT DEFAULT 'submitted',
+ result TEXT DEFAULT '',
+ rejection_reason TEXT DEFAULT '',
+ created_at TEXT DEFAULT (datetime('now')),
+ completed_at TEXT
+ )"""
+ )
+ conn.execute(
+ "INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
+ "VALUES (?, ?, ?, ?, ?, ?)",
+ (
+ str(uuid.uuid4()),
+ title,
+ body,
+ category,
+ "timmy-thinking",
+ datetime.utcnow().isoformat(),
+ ),
+ )
+ conn.commit()
except Exception as exc:
logger.debug("Work order bridge failed: %s", exc)
diff --git a/src/timmy/semantic_memory.py b/src/timmy/semantic_memory.py
index 853cfe5f..b8608c78 100644
--- a/src/timmy/semantic_memory.py
+++ b/src/timmy/semantic_memory.py
@@ -15,6 +15,7 @@ import hashlib
import json
import logging
import sqlite3
+from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
@@ -113,20 +114,19 @@ class SemanticMemory:
def _init_db(self) -> None:
"""Initialize SQLite with vector storage."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
- conn = sqlite3.connect(str(self.db_path))
- conn.execute("""
- CREATE TABLE IF NOT EXISTS chunks (
- id TEXT PRIMARY KEY,
- source TEXT NOT NULL,
- content TEXT NOT NULL,
- embedding TEXT NOT NULL,
- created_at TEXT NOT NULL,
- source_hash TEXT NOT NULL
- )
- """)
- conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
- conn.commit()
- conn.close()
+ with closing(sqlite3.connect(str(self.db_path))) as conn:
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS chunks (
+ id TEXT PRIMARY KEY,
+ source TEXT NOT NULL,
+ content TEXT NOT NULL,
+ embedding TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ source_hash TEXT NOT NULL
+ )
+ """)
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
+ conn.commit()
def index_file(self, filepath: Path) -> int:
"""Index a single file into semantic memory."""
@@ -136,39 +136,37 @@ class SemanticMemory:
content = filepath.read_text()
file_hash = hashlib.md5(content.encode()).hexdigest()
- # Check if already indexed with same hash
- conn = sqlite3.connect(str(self.db_path))
- cursor = conn.execute(
- "SELECT source_hash FROM chunks WHERE source = ? LIMIT 1", (str(filepath),)
- )
- existing = cursor.fetchone()
- if existing and existing[0] == file_hash:
- conn.close()
- return 0 # Already indexed
-
- # Delete old chunks for this file
- conn.execute("DELETE FROM chunks WHERE source = ?", (str(filepath),))
-
- # Split into chunks (paragraphs)
- chunks = self._split_into_chunks(content)
-
- # Index each chunk
- now = datetime.now(UTC).isoformat()
- for i, chunk_text in enumerate(chunks):
- if len(chunk_text.strip()) < 20: # Skip tiny chunks
- continue
-
- chunk_id = f"{filepath.stem}_{i}"
- embedding = embed_text(chunk_text)
-
- conn.execute(
- """INSERT INTO chunks (id, source, content, embedding, created_at, source_hash)
- VALUES (?, ?, ?, ?, ?, ?)""",
- (chunk_id, str(filepath), chunk_text, json.dumps(embedding), now, file_hash),
+ with closing(sqlite3.connect(str(self.db_path))) as conn:
+ # Check if already indexed with same hash
+ cursor = conn.execute(
+ "SELECT source_hash FROM chunks WHERE source = ? LIMIT 1", (str(filepath),)
)
+ existing = cursor.fetchone()
+ if existing and existing[0] == file_hash:
+ return 0 # Already indexed
- conn.commit()
- conn.close()
+ # Delete old chunks for this file
+ conn.execute("DELETE FROM chunks WHERE source = ?", (str(filepath),))
+
+ # Split into chunks (paragraphs)
+ chunks = self._split_into_chunks(content)
+
+ # Index each chunk
+ now = datetime.now(UTC).isoformat()
+ for i, chunk_text in enumerate(chunks):
+ if len(chunk_text.strip()) < 20: # Skip tiny chunks
+ continue
+
+ chunk_id = f"{filepath.stem}_{i}"
+ embedding = embed_text(chunk_text)
+
+ conn.execute(
+ """INSERT INTO chunks (id, source, content, embedding, created_at, source_hash)
+ VALUES (?, ?, ?, ?, ?, ?)""",
+ (chunk_id, str(filepath), chunk_text, json.dumps(embedding), now, file_hash),
+ )
+
+ conn.commit()
logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks))
return len(chunks)
@@ -222,13 +220,11 @@ class SemanticMemory:
"""Search for relevant memory chunks."""
query_embedding = embed_text(query)
- conn = sqlite3.connect(str(self.db_path))
- conn.row_factory = sqlite3.Row
+ with closing(sqlite3.connect(str(self.db_path))) as conn:
+ conn.row_factory = sqlite3.Row
- # Get all chunks (in production, use vector index)
- rows = conn.execute("SELECT source, content, embedding FROM chunks").fetchall()
-
- conn.close()
+ # Get all chunks (in production, use vector index)
+ rows = conn.execute("SELECT source, content, embedding FROM chunks").fetchall()
# Calculate similarities
scored = []
@@ -268,10 +264,9 @@ class SemanticMemory:
def stats(self) -> dict:
"""Get indexing statistics."""
- conn = sqlite3.connect(str(self.db_path))
- cursor = conn.execute("SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks")
- total_chunks, total_files = cursor.fetchone()
- conn.close()
+ with closing(sqlite3.connect(str(self.db_path))) as conn:
+ cursor = conn.execute("SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks")
+ total_chunks, total_files = cursor.fetchone()
return {
"total_chunks": total_chunks,
diff --git a/src/timmy/session_logger.py b/src/timmy/session_logger.py
index 8a52088d..f23704bc 100644
--- a/src/timmy/session_logger.py
+++ b/src/timmy/session_logger.py
@@ -38,21 +38,23 @@ class SessionLogger:
# In-memory buffer
self._buffer: list[dict] = []
- def record_message(self, role: str, content: str) -> None:
+ def record_message(self, role: str, content: str, confidence: float | None = None) -> None:
"""Record a user message.
Args:
role: "user" or "timmy"
content: The message content
+ confidence: Optional confidence score (0.0 to 1.0)
"""
- self._buffer.append(
- {
- "type": "message",
- "role": role,
- "content": content,
- "timestamp": datetime.now().isoformat(),
- }
- )
+ entry = {
+ "type": "message",
+ "role": role,
+ "content": content,
+ "timestamp": datetime.now().isoformat(),
+ }
+ if confidence is not None:
+ entry["confidence"] = confidence
+ self._buffer.append(entry)
def record_tool_call(self, tool_name: str, args: dict, result: str) -> None:
"""Record a tool call.
diff --git a/src/timmy/thinking.py b/src/timmy/thinking.py
index e7726c16..1c987999 100644
--- a/src/timmy/thinking.py
+++ b/src/timmy/thinking.py
@@ -21,6 +21,7 @@ import logging
import random
import sqlite3
import uuid
+from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from difflib import SequenceMatcher
@@ -320,19 +321,17 @@ class ThinkingEngine:
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""
- conn = _get_conn(self._db_path)
- rows = conn.execute(
- "SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
- (limit,),
- ).fetchall()
- conn.close()
+ with closing(_get_conn(self._db_path)) as conn:
+ rows = conn.execute(
+ "SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
+ (limit,),
+ ).fetchall()
return [_row_to_thought(r) for r in rows]
def get_thought(self, thought_id: str) -> Thought | None:
"""Retrieve a single thought by ID."""
- conn = _get_conn(self._db_path)
- row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
- conn.close()
+ with closing(_get_conn(self._db_path)) as conn:
+ row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
return _row_to_thought(row) if row else None
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
@@ -342,26 +341,24 @@ class ThinkingEngine:
"""
chain = []
current_id: str | None = thought_id
- conn = _get_conn(self._db_path)
- for _ in range(max_depth):
- if not current_id:
- break
- row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
- if not row:
- break
- chain.append(_row_to_thought(row))
- current_id = row["parent_id"]
+ with closing(_get_conn(self._db_path)) as conn:
+ for _ in range(max_depth):
+ if not current_id:
+ break
+ row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
+ if not row:
+ break
+ chain.append(_row_to_thought(row))
+ current_id = row["parent_id"]
- conn.close()
chain.reverse() # Chronological order
return chain
def count_thoughts(self) -> int:
"""Return total number of stored thoughts."""
- conn = _get_conn(self._db_path)
- count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
- conn.close()
+ with closing(_get_conn(self._db_path)) as conn:
+ count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
return count
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
@@ -369,25 +366,23 @@ class ThinkingEngine:
Returns the number of deleted rows.
"""
- conn = _get_conn(self._db_path)
- try:
- total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
- if total <= keep_min:
+ with closing(_get_conn(self._db_path)) as conn:
+ try:
+ total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
+ if total <= keep_min:
+ return 0
+ cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
+ cursor = conn.execute(
+ "DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
+ "(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
+ (cutoff, keep_min),
+ )
+ deleted = cursor.rowcount
+ conn.commit()
+ return deleted
+ except Exception as exc:
+ logger.warning("Thought pruning failed: %s", exc)
return 0
- cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
- cursor = conn.execute(
- "DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
- "(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
- (cutoff, keep_min),
- )
- deleted = cursor.rowcount
- conn.commit()
- return deleted
- except Exception as exc:
- logger.warning("Thought pruning failed: %s", exc)
- return 0
- finally:
- conn.close()
# ── Private helpers ──────────────────────────────────────────────────
@@ -608,12 +603,11 @@ class ThinkingEngine:
# Thought count today (cheap DB query)
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
- conn = _get_conn(self._db_path)
- count = conn.execute(
- "SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
- (today_start.isoformat(),),
- ).fetchone()["c"]
- conn.close()
+ with closing(_get_conn(self._db_path)) as conn:
+ count = conn.execute(
+ "SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
+ (today_start.isoformat(),),
+ ).fetchone()["c"]
parts.append(f"Thoughts today: {count}")
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
@@ -966,16 +960,21 @@ class ThinkingEngine:
created_at=datetime.now(UTC).isoformat(),
)
- conn = _get_conn(self._db_path)
- conn.execute(
- """
- INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
- VALUES (?, ?, ?, ?, ?)
- """,
- (thought.id, thought.content, thought.seed_type, thought.parent_id, thought.created_at),
- )
- conn.commit()
- conn.close()
+ with closing(_get_conn(self._db_path)) as conn:
+ conn.execute(
+ """
+ INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
+ VALUES (?, ?, ?, ?, ?)
+ """,
+ (
+ thought.id,
+ thought.content,
+ thought.seed_type,
+ thought.parent_id,
+ thought.created_at,
+ ),
+ )
+ conn.commit()
return thought
def _log_event(self, thought: Thought) -> None:
diff --git a/src/timmy/tools.py b/src/timmy/tools.py
index ec8db02b..3fe1ef2f 100644
--- a/src/timmy/tools.py
+++ b/src/timmy/tools.py
@@ -472,26 +472,8 @@ def consult_grok(query: str) -> str:
return response
-def create_full_toolkit(base_dir: str | Path | None = None):
- """Create a full toolkit with all available tools (for the orchestrator).
-
- Includes: web search, file read/write, shell commands, python execution,
- memory search for contextual recall, and Grok consultation.
- """
- if not _AGNO_TOOLS_AVAILABLE:
- # Return None when tools aren't available (tests)
- return None
-
- from timmy.tool_safety import DANGEROUS_TOOLS
-
- toolkit = Toolkit(
- name="full",
- )
- # Set requires_confirmation_tools AFTER construction (avoids agno WARNING
- # about tools not yet registered) but BEFORE register() calls (so each
- # Function gets requires_confirmation=True). Fixes #79.
- toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS)
-
+def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
+ """Register core execution and file tools."""
# Python execution
python_tools = PythonTools()
toolkit.register(python_tools.run_python_code, name="python")
@@ -500,10 +482,7 @@ def create_full_toolkit(base_dir: str | Path | None = None):
shell_tools = ShellTools()
toolkit.register(shell_tools.run_shell_command, name="shell")
- # File operations - use repo_root from settings
- from config import settings
-
- base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
+ # File operations
file_tools = FileTools(base_dir=base_path)
toolkit.register(_make_smart_read_file(file_tools), name="read_file")
toolkit.register(file_tools.save_file, name="write_file")
@@ -512,7 +491,9 @@ def create_full_toolkit(base_dir: str | Path | None = None):
# Calculator — exact arithmetic (never let the LLM guess)
toolkit.register(calculator, name="calculator")
- # Grok consultation — premium frontier reasoning (opt-in)
+
+def _register_grok_tool(toolkit: Toolkit) -> None:
+ """Register Grok consultation tool if available."""
try:
from timmy.backends import grok_available
@@ -523,7 +504,9 @@ def create_full_toolkit(base_dir: str | Path | None = None):
logger.warning("Tool execution failed (Grok registration): %s", exc)
logger.debug("Grok tool not available")
- # Memory search, write, and forget — persistent recall across all channels
+
+def _register_memory_tools(toolkit: Toolkit) -> None:
+ """Register memory search, write, and forget tools."""
try:
from timmy.semantic_memory import memory_forget, memory_read, memory_search, memory_write
@@ -535,7 +518,9 @@ def create_full_toolkit(base_dir: str | Path | None = None):
logger.warning("Tool execution failed (Memory tools registration): %s", exc)
logger.debug("Memory tools not available")
- # Agentic loop — background multi-step task execution
+
+def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
+ """Register agentic loop tool for background multi-step task execution."""
try:
from timmy.agentic_loop import run_agentic_loop
@@ -582,7 +567,9 @@ def create_full_toolkit(base_dir: str | Path | None = None):
logger.warning("Tool execution failed (plan_and_execute registration): %s", exc)
logger.debug("plan_and_execute tool not available")
- # System introspection - query runtime environment (sovereign self-knowledge)
+
+def _register_introspection_tools(toolkit: Toolkit) -> None:
+ """Register system introspection tools for runtime environment queries."""
try:
from timmy.tools_intro import (
check_ollama_health,
@@ -599,7 +586,9 @@ def create_full_toolkit(base_dir: str | Path | None = None):
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
logger.debug("Introspection tools not available")
- # Inter-agent delegation - dispatch tasks to swarm agents
+
+def _register_delegation_tools(toolkit: Toolkit) -> None:
+ """Register inter-agent delegation tools."""
try:
from timmy.tools_delegation import delegate_task, delegate_to_kimi, list_swarm_agents
@@ -610,6 +599,34 @@ def create_full_toolkit(base_dir: str | Path | None = None):
logger.warning("Tool execution failed (Delegation tools registration): %s", exc)
logger.debug("Delegation tools not available")
+
+def create_full_toolkit(base_dir: str | Path | None = None):
+ """Create a full toolkit with all available tools (for the orchestrator).
+
+ Includes: web search, file read/write, shell commands, python execution,
+ memory search for contextual recall, and Grok consultation.
+ """
+ if not _AGNO_TOOLS_AVAILABLE:
+ # Return None when tools aren't available (tests)
+ return None
+
+ from timmy.tool_safety import DANGEROUS_TOOLS
+
+ toolkit = Toolkit(name="full")
+ # Set requires_confirmation_tools AFTER construction (avoids agno WARNING
+ # about tools not yet registered) but BEFORE register() calls (so each
+ # Function gets requires_confirmation=True). Fixes #79.
+ toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS)
+
+ base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
+
+ _register_core_tools(toolkit, base_path)
+ _register_grok_tool(toolkit)
+ _register_memory_tools(toolkit)
+ _register_agentic_loop_tool(toolkit)
+ _register_introspection_tools(toolkit)
+ _register_delegation_tools(toolkit)
+
# Gitea issue management is now provided by the gitea-mcp server
# (wired in as MCPTools in agent.py, not registered here)
@@ -719,13 +736,9 @@ get_tools_for_persona = get_tools_for_agent
PERSONA_TOOLKITS = AGENT_TOOLKITS
-def get_all_available_tools() -> dict[str, dict]:
- """Get a catalog of all available tools and their descriptions.
-
- Returns:
- Dict mapping tool categories to their tools and descriptions.
- """
- catalog = {
+def _core_tool_catalog() -> dict:
+ """Return core file and execution tools catalog entries."""
+ return {
"shell": {
"name": "Shell Commands",
"description": "Execute shell commands (sandboxed)",
@@ -751,16 +764,39 @@ def get_all_available_tools() -> dict[str, dict]:
"description": "List files in a directory",
"available_in": ["echo", "seer", "forge", "quill", "mace", "helm", "orchestrator"],
},
+ }
+
+
+def _analysis_tool_catalog() -> dict:
+ """Return analysis and calculation tools catalog entries."""
+ return {
"calculator": {
"name": "Calculator",
"description": "Evaluate mathematical expressions with exact results",
"available_in": ["orchestrator"],
},
+ }
+
+
+def _ai_tool_catalog() -> dict:
+ """Return AI assistant and frontier reasoning tools catalog entries."""
+ return {
"consult_grok": {
"name": "Consult Grok",
"description": "Premium frontier reasoning via xAI Grok (opt-in, Lightning-payable)",
"available_in": ["orchestrator"],
},
+ "aider": {
+ "name": "Aider AI Assistant",
+ "description": "Local AI coding assistant using Ollama (qwen3.5:latest or deepseek-coder)",
+ "available_in": ["forge", "orchestrator"],
+ },
+ }
+
+
+def _introspection_tool_catalog() -> dict:
+ """Return system introspection tools catalog entries."""
+ return {
"get_system_info": {
"name": "System Info",
"description": "Introspect runtime environment - discover model, Python version, config",
@@ -776,11 +812,12 @@ def get_all_available_tools() -> dict[str, dict]:
"description": "Check status of memory tiers (hot memory, vault)",
"available_in": ["orchestrator"],
},
- "aider": {
- "name": "Aider AI Assistant",
- "description": "Local AI coding assistant using Ollama (qwen3.5:latest or deepseek-coder)",
- "available_in": ["forge", "orchestrator"],
- },
+ }
+
+
+def _experiment_tool_catalog() -> dict:
+ """Return ML experiment tools catalog entries."""
+ return {
"prepare_experiment": {
"name": "Prepare Experiment",
"description": "Clone autoresearch repo and run data preparation for ML experiments",
@@ -798,6 +835,9 @@ def get_all_available_tools() -> dict[str, dict]:
},
}
+
+def _import_creative_catalogs(catalog: dict) -> None:
+ """Import and merge creative tool catalogs from creative module."""
# ── Git tools ─────────────────────────────────────────────────────────────
try:
from creative.tools.git_tools import GIT_TOOL_CATALOG
@@ -876,4 +916,18 @@ def get_all_available_tools() -> dict[str, dict]:
except ImportError:
pass
+
+def get_all_available_tools() -> dict[str, dict]:
+ """Get a catalog of all available tools and their descriptions.
+
+ Returns:
+ Dict mapping tool categories to their tools and descriptions.
+ """
+ catalog = {}
+ catalog.update(_core_tool_catalog())
+ catalog.update(_analysis_tool_catalog())
+ catalog.update(_ai_tool_catalog())
+ catalog.update(_introspection_tool_catalog())
+ catalog.update(_experiment_tool_catalog())
+ _import_creative_catalogs(catalog)
return catalog
diff --git a/src/timmy/tools_intro/__init__.py b/src/timmy/tools_intro/__init__.py
index b0b6f01b..0efe628a 100644
--- a/src/timmy/tools_intro/__init__.py
+++ b/src/timmy/tools_intro/__init__.py
@@ -6,7 +6,9 @@ being told about it in the system prompt.
import logging
import platform
+import sqlite3
import sys
+from contextlib import closing
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@@ -174,19 +176,16 @@ def get_memory_status() -> dict[str, Any]:
# Tier 3: Semantic memory row count
tier3_info: dict[str, Any] = {"available": False}
try:
- import sqlite3
-
sem_db = repo_root / "data" / "memory.db"
if sem_db.exists():
- conn = sqlite3.connect(str(sem_db))
- row = conn.execute(
- "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'"
- ).fetchone()
- if row and row[0]:
- count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
- tier3_info["available"] = True
- tier3_info["vector_count"] = count[0] if count else 0
- conn.close()
+ with closing(sqlite3.connect(str(sem_db))) as conn:
+ row = conn.execute(
+ "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'"
+ ).fetchone()
+ if row and row[0]:
+ count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
+ tier3_info["available"] = True
+ tier3_info["vector_count"] = count[0] if count else 0
except Exception as exc:
logger.debug("Memory status query failed: %s", exc)
pass
@@ -194,26 +193,23 @@ def get_memory_status() -> dict[str, Any]:
# Self-coding journal stats
journal_info: dict[str, Any] = {"available": False}
try:
- import sqlite3 as _sqlite3
-
journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists():
- conn = _sqlite3.connect(str(journal_db))
- conn.row_factory = _sqlite3.Row
- rows = conn.execute(
- "SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome"
- ).fetchall()
- if rows:
- counts = {r["outcome"]: r["cnt"] for r in rows}
- total = sum(counts.values())
- journal_info = {
- "available": True,
- "total_attempts": total,
- "successes": counts.get("success", 0),
- "failures": counts.get("failure", 0),
- "success_rate": round(counts.get("success", 0) / total, 2) if total else 0,
- }
- conn.close()
+ with closing(sqlite3.connect(str(journal_db))) as conn:
+ conn.row_factory = sqlite3.Row
+ rows = conn.execute(
+ "SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome"
+ ).fetchall()
+ if rows:
+ counts = {r["outcome"]: r["cnt"] for r in rows}
+ total = sum(counts.values())
+ journal_info = {
+ "available": True,
+ "total_attempts": total,
+ "successes": counts.get("success", 0),
+ "failures": counts.get("failure", 0),
+ "success_rate": round(counts.get("success", 0) / total, 2) if total else 0,
+ }
except Exception as exc:
logger.debug("Journal stats query failed: %s", exc)
pass
diff --git a/tests/timmy/test_confidence.py b/tests/timmy/test_confidence.py
new file mode 100644
index 00000000..cc3f8477
--- /dev/null
+++ b/tests/timmy/test_confidence.py
@@ -0,0 +1,128 @@
+"""Tests for confidence estimation in src/timmy/confidence.py."""
+
+from timmy.confidence import (
+ CERTAINTY_WORDS,
+ HEDGING_WORDS,
+ estimate_confidence,
+)
+
+
+class TestEstimateConfidence:
+ """Test cases for estimate_confidence function."""
+
+ def test_empty_string_returns_zero(self):
+ """Empty string should return 0.0 confidence."""
+ assert estimate_confidence("") == 0.0
+
+ def test_whitespace_only_returns_zero(self):
+ """Whitespace-only string should return 0.0 confidence."""
+ assert estimate_confidence(" ") == 0.0
+
+ def test_normal_factual_response(self):
+ """Factual response should have at least moderate confidence."""
+ result = estimate_confidence("Paris is the capital of France.")
+ assert 0.5 <= result <= 1.0
+ # 6 words doesn't get short-response boost, should be at base
+ assert result >= 0.5
+
+ def test_i_dont_know_gives_very_low_confidence(self):
+ """Direct admission of not knowing should give very low confidence."""
+ result = estimate_confidence("I don't know the answer to that.")
+ assert result <= 0.2
+
+ def test_i_am_not_sure_gives_very_low_confidence(self):
+ """Uncertainty admission should give very low confidence."""
+ result = estimate_confidence("I am not sure about this.")
+ assert result <= 0.2
+
+ def test_hedging_words_reduce_confidence(self):
+ """Hedging words should reduce confidence below base."""
+ base = estimate_confidence("This is the answer.")
+ hedged = estimate_confidence("I think this is the answer.")
+ assert hedged < base
+
+ def test_maybe_reduces_confidence(self):
+ """The word 'maybe' should reduce confidence."""
+ base = estimate_confidence("It will rain tomorrow.")
+ hedged = estimate_confidence("Maybe it will rain tomorrow.")
+ assert hedged < base
+
+ def test_perhaps_reduces_confidence(self):
+ """The word 'perhaps' should reduce confidence."""
+ base = estimate_confidence("The solution is correct.")
+ hedged = estimate_confidence("Perhaps the solution is correct.")
+ assert hedged < base
+
+ def test_certainty_words_increase_confidence(self):
+ """Certainty words should increase confidence above base."""
+ # Use longer sentence to avoid short-response boost confounding
+ base = estimate_confidence("This is a longer sentence with more words.")
+ certain = estimate_confidence(
+ "I definitely know this is a longer sentence with more words."
+ )
+ assert certain > base
+
+ def test_definitely_increases_confidence(self):
+ """The word 'definitely' should increase confidence."""
+ base = estimate_confidence("This will work.")
+ certain = estimate_confidence("This will definitely work.")
+ assert certain > base
+
+ def test_question_reduces_confidence(self):
+ """Questions in response should reduce confidence."""
+ base = estimate_confidence("The value is 10.")
+ questioning = estimate_confidence("The value is 10?")
+ assert questioning < base
+
+ def test_multiple_hedging_words_compound(self):
+ """Multiple hedging words should compound to lower confidence."""
+ text = "I think maybe this could be the answer, but I'm not sure."
+ result = estimate_confidence(text)
+ assert result < 0.4
+
+ def test_output_always_in_valid_range(self):
+ """Output should always be clamped to [0.0, 1.0]."""
+ # Test with text that has many hedging words
+ heavily_hedged = (
+ "I think maybe perhaps possibly I believe this might could be approximately right."
+ )
+ result = estimate_confidence(heavily_hedged)
+ assert 0.0 <= result <= 1.0
+
+ # Test with text that has many certainty words
+ heavily_certain = "I know definitely certainly absolutely without doubt the answer is specifically exactly correct."
+ result = estimate_confidence(heavily_certain)
+ assert 0.0 <= result <= 1.0
+
+ def test_hedging_words_list_populated(self):
+ """HEDGING_WORDS list should contain expected hedging phrases."""
+ assert "i think" in HEDGING_WORDS
+ assert "maybe" in HEDGING_WORDS
+ assert "perhaps" in HEDGING_WORDS
+ assert "not sure" in HEDGING_WORDS
+ assert "possibly" in HEDGING_WORDS
+
+ def test_certainty_words_list_populated(self):
+ """CERTAINTY_WORDS list should contain expected certainty phrases."""
+ assert "i know" in CERTAINTY_WORDS
+ assert "definitely" in CERTAINTY_WORDS
+ assert "certainly" in CERTAINTY_WORDS
+ assert "the answer is" in CERTAINTY_WORDS
+
+ def test_certainty_and_hedging_cancel(self):
+ """Mix of certainty and hedging should balance out near base."""
+ text = "I definitely think this is correct."
+ result = estimate_confidence(text)
+ # Should be near base (0.5) but hedging slightly stronger
+ assert 0.3 <= result <= 0.7
+
+ def test_i_have_no_idea_gives_very_low_confidence(self):
+ """I have no idea should give very low confidence."""
+ result = estimate_confidence("I have no idea what you're talking about.")
+ assert result <= 0.2
+
+ def test_short_response_gets_boost(self):
+ """Very short factual responses should get confidence boost."""
+ short = estimate_confidence("42")
+ # Short factual should be higher due to boost
+ assert short > 0.5