[loop-cycle-47] refactor: replace bare sqlite3.connect() with context managers (#148) (#155)

This commit is contained in:
2026-03-15 11:05:39 -04:00
parent 717dba9816
commit 5aea8be223
12 changed files with 410 additions and 493 deletions

View File

@@ -3,6 +3,7 @@
import asyncio
import logging
import sqlite3
from contextlib import closing
from pathlib import Path
from fastapi import APIRouter, Request
@@ -39,56 +40,50 @@ def _query_database(db_path: str) -> dict:
"""Open a database read-only and return all tables with their rows."""
result = {"tables": {}, "error": None}
try:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
except Exception as exc:
result["error"] = str(exc)
return result
with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as conn:
conn.row_factory = sqlite3.Row
try:
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
for (table_name,) in tables:
try:
rows = conn.execute(
f"SELECT * FROM [{table_name}] LIMIT {MAX_ROWS}" # noqa: S608
).fetchall()
columns = (
[
desc[0]
for desc in conn.execute(
f"SELECT * FROM [{table_name}] LIMIT 0"
).description
]
if rows
else []
) # noqa: S608
if not columns and rows:
columns = list(rows[0].keys())
elif not columns:
# Get columns even for empty tables
cursor = conn.execute(f"PRAGMA table_info([{table_name}])") # noqa: S608
columns = [r[1] for r in cursor.fetchall()]
count = conn.execute(f"SELECT COUNT(*) FROM [{table_name}]").fetchone()[0] # noqa: S608
result["tables"][table_name] = {
"columns": columns,
"rows": [dict(r) for r in rows],
"total_count": count,
"truncated": count > MAX_ROWS,
}
except Exception as exc:
result["tables"][table_name] = {
"error": str(exc),
"columns": [],
"rows": [],
"total_count": 0,
"truncated": False,
}
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()
for (table_name,) in tables:
try:
rows = conn.execute(
f"SELECT * FROM [{table_name}] LIMIT {MAX_ROWS}" # noqa: S608
).fetchall()
columns = (
[
desc[0]
for desc in conn.execute(
f"SELECT * FROM [{table_name}] LIMIT 0"
).description
]
if rows
else []
) # noqa: S608
if not columns and rows:
columns = list(rows[0].keys())
elif not columns:
# Get columns even for empty tables
cursor = conn.execute(f"PRAGMA table_info([{table_name}])") # noqa: S608
columns = [r[1] for r in cursor.fetchall()]
count = conn.execute(f"SELECT COUNT(*) FROM [{table_name}]").fetchone()[0] # noqa: S608
result["tables"][table_name] = {
"columns": columns,
"rows": [dict(r) for r in rows],
"total_count": count,
"truncated": count > MAX_ROWS,
}
except Exception as exc:
result["tables"][table_name] = {
"error": str(exc),
"columns": [],
"rows": [],
"total_count": 0,
"truncated": False,
}
except Exception as exc:
result["error"] = str(exc)
finally:
conn.close()
return result

View File

@@ -6,8 +6,11 @@ for the Mission Control dashboard.
import asyncio
import logging
import sqlite3
import time
from contextlib import closing
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request
@@ -134,13 +137,9 @@ def _check_lightning() -> DependencyStatus:
def _check_sqlite() -> DependencyStatus:
"""Check SQLite database status."""
try:
import sqlite3
from pathlib import Path
db_path = Path(settings.repo_root) / "data" / "timmy.db"
conn = sqlite3.connect(str(db_path))
conn.execute("SELECT 1")
conn.close()
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute("SELECT 1")
return DependencyStatus(
name="SQLite Database",

View File

@@ -3,6 +3,7 @@
import logging
import sqlite3
import uuid
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -101,8 +102,7 @@ class _TaskView:
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(request: Request):
"""Render the main task queue page with 3-column layout."""
db = _get_db()
try:
with closing(_get_db()) as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
@@ -121,8 +121,6 @@ async def tasks_page(request: Request):
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
finally:
db.close()
return templates.TemplateResponse(
request,
@@ -145,13 +143,10 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
db = _get_db()
try:
with closing(_get_db()) as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
finally:
db.close()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -167,13 +162,10 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
db = _get_db()
try:
with closing(_get_db()) as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
finally:
db.close()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -189,13 +181,10 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
db = _get_db()
try:
with closing(_get_db()) as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
finally:
db.close()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -231,16 +220,13 @@ async def create_task_form(
now = datetime.utcnow().isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
db = _get_db()
try:
with closing(_get_db()) as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
finally:
db.close()
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
@@ -283,16 +269,13 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
db = _get_db()
try:
with closing(_get_db()) as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
finally:
db.close()
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -304,16 +287,13 @@ async def _set_status(request: Request, task_id: str, new_status: str):
completed_at = (
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
db = _get_db()
try:
with closing(_get_db()) as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
finally:
db.close()
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -339,8 +319,7 @@ async def api_create_task(request: Request):
if priority not in VALID_PRIORITIES:
priority = "normal"
db = _get_db()
try:
with closing(_get_db()) as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
@@ -356,8 +335,6 @@ async def api_create_task(request: Request):
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
finally:
db.close()
return JSONResponse(_row_to_dict(row), status_code=201)
@@ -365,11 +342,8 @@ async def api_create_task(request: Request):
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks():
"""List all tasks as JSON."""
db = _get_db()
try:
with closing(_get_db()) as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
finally:
db.close()
return JSONResponse([_row_to_dict(r) for r in rows])
@@ -384,16 +358,13 @@ async def api_update_status(task_id: str, request: Request):
completed_at = (
datetime.utcnow().isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
db = _get_db()
try:
with closing(_get_db()) as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
finally:
db.close()
if not row:
raise HTTPException(404, "Task not found")
return JSONResponse(_row_to_dict(row))
@@ -402,12 +373,9 @@ async def api_update_status(task_id: str, request: Request):
@router.delete("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_delete_task(task_id: str):
"""Delete a task."""
db = _get_db()
try:
with closing(_get_db()) as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
finally:
db.close()
if cursor.rowcount == 0:
raise HTTPException(404, "Task not found")
return JSONResponse({"success": True, "id": task_id})
@@ -421,8 +389,7 @@ async def api_delete_task(task_id: str):
@router.get("/api/queue/status", response_class=JSONResponse)
async def queue_status(assigned_to: str = "default"):
"""Return queue status for the chat panel's agent status indicator."""
db = _get_db()
try:
with closing(_get_db()) as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
@@ -431,8 +398,6 @@ async def queue_status(assigned_to: str = "default"):
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
finally:
db.close()
if running:
return JSONResponse(

View File

@@ -3,6 +3,7 @@
import logging
import sqlite3
import uuid
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -104,14 +105,11 @@ def _query_wos(db, statuses):
@router.get("/work-orders/queue", response_class=HTMLResponse)
async def work_orders_page(request: Request):
db = _get_db()
try:
with closing(_get_db()) as db:
pending = _query_wos(db, ["submitted", "triaged"])
active = _query_wos(db, ["approved", "in_progress"])
completed = _query_wos(db, ["completed"])
rejected = _query_wos(db, ["rejected"])
finally:
db.close()
return templates.TemplateResponse(
request,
@@ -148,8 +146,7 @@ async def submit_work_order(
priority = priority if priority in PRIORITIES else "medium"
category = category if category in CATEGORIES else "suggestion"
db = _get_db()
try:
with closing(_get_db()) as db:
db.execute(
"INSERT INTO work_orders (id, title, description, priority, category, submitter, related_files, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
@@ -157,8 +154,6 @@ async def submit_work_order(
)
db.commit()
row = db.execute("SELECT * FROM work_orders WHERE id=?", (wo_id,)).fetchone()
finally:
db.close()
wo = _WOView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/work_order_card.html", {"wo": wo})
@@ -171,11 +166,8 @@ async def submit_work_order(
@router.get("/work-orders/queue/pending", response_class=HTMLResponse)
async def pending_partial(request: Request):
db = _get_db()
try:
with closing(_get_db()) as db:
wos = _query_wos(db, ["submitted", "triaged"])
finally:
db.close()
if not wos:
return HTMLResponse(
'<div style="color: var(--text-muted); font-size: 0.8rem; padding: 12px 0;">'
@@ -193,11 +185,8 @@ async def pending_partial(request: Request):
@router.get("/work-orders/queue/active", response_class=HTMLResponse)
async def active_partial(request: Request):
db = _get_db()
try:
with closing(_get_db()) as db:
wos = _query_wos(db, ["approved", "in_progress"])
finally:
db.close()
if not wos:
return HTMLResponse(
'<div style="color: var(--text-muted); font-size: 0.8rem; padding: 12px 0;">'
@@ -222,8 +211,7 @@ async def _update_status(request: Request, wo_id: str, new_status: str, **extra)
completed_at = (
datetime.utcnow().isoformat() if new_status in ("completed", "rejected") else None
)
db = _get_db()
try:
with closing(_get_db()) as db:
sets = ["status=?", "completed_at=COALESCE(?, completed_at)"]
vals = [new_status, completed_at]
for col, val in extra.items():
@@ -233,8 +221,6 @@ async def _update_status(request: Request, wo_id: str, new_status: str, **extra)
db.execute(f"UPDATE work_orders SET {', '.join(sets)} WHERE id=?", vals)
db.commit()
row = db.execute("SELECT * FROM work_orders WHERE id=?", (wo_id,)).fetchone()
finally:
db.close()
if not row:
raise HTTPException(404, "Work order not found")
wo = _WOView(_row_to_dict(row))

View File

@@ -10,6 +10,7 @@ import json
import logging
import sqlite3
from collections.abc import Callable, Coroutine
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
@@ -99,14 +100,11 @@ class EventBus:
if self._persistence_db_path is None:
return
self._persistence_db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self._persistence_db_path))
try:
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.executescript(_EVENTS_SCHEMA)
conn.commit()
finally:
conn.close()
def _get_persistence_conn(self) -> sqlite3.Connection | None:
"""Get a connection to the persistence database."""
@@ -123,27 +121,26 @@ class EventBus:
if conn is None:
return
try:
task_id = event.data.get("task_id", "")
agent_id = event.data.get("agent_id", "")
conn.execute(
"INSERT OR IGNORE INTO events "
"(id, event_type, source, task_id, agent_id, data, timestamp) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
event.id,
event.type,
event.source,
task_id,
agent_id,
json.dumps(event.data),
event.timestamp,
),
)
conn.commit()
with closing(conn):
task_id = event.data.get("task_id", "")
agent_id = event.data.get("agent_id", "")
conn.execute(
"INSERT OR IGNORE INTO events "
"(id, event_type, source, task_id, agent_id, data, timestamp) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
event.id,
event.type,
event.source,
task_id,
agent_id,
json.dumps(event.data),
event.timestamp,
),
)
conn.commit()
except Exception as exc:
logger.debug("Failed to persist event: %s", exc)
finally:
conn.close()
# ── Replay ───────────────────────────────────────────────────────────
@@ -170,40 +167,39 @@ class EventBus:
return []
try:
conditions = []
params: list = []
with closing(conn):
conditions = []
params: list = []
if event_type:
conditions.append("event_type = ?")
params.append(event_type)
if source:
conditions.append("source = ?")
params.append(source)
if task_id:
conditions.append("task_id = ?")
params.append(task_id)
if event_type:
conditions.append("event_type = ?")
params.append(event_type)
if source:
conditions.append("source = ?")
params.append(source)
if task_id:
conditions.append("task_id = ?")
params.append(task_id)
where = " AND ".join(conditions) if conditions else "1=1"
sql = f"SELECT * FROM events WHERE {where} ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
where = " AND ".join(conditions) if conditions else "1=1"
sql = f"SELECT * FROM events WHERE {where} ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
rows = conn.execute(sql, params).fetchall()
rows = conn.execute(sql, params).fetchall()
return [
Event(
id=row["id"],
type=row["event_type"],
source=row["source"],
data=json.loads(row["data"]) if row["data"] else {},
timestamp=row["timestamp"],
)
for row in rows
]
return [
Event(
id=row["id"],
type=row["event_type"],
source=row["source"],
data=json.loads(row["data"]) if row["data"] else {},
timestamp=row["timestamp"],
)
for row in rows
]
except Exception as exc:
logger.debug("Failed to replay events: %s", exc)
return []
finally:
conn.close()
# ── Subscribe / Publish ──────────────────────────────────────────────

View File

@@ -11,6 +11,7 @@ model roles (student, teacher, judge/PRM) run on dedicated resources.
import logging
import sqlite3
import threading
from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import StrEnum
@@ -105,23 +106,22 @@ class ModelRegistry:
def _load_from_db(self) -> None:
"""Bootstrap cache from SQLite."""
try:
conn = _get_conn()
for row in conn.execute("SELECT * FROM custom_models WHERE active = 1").fetchall():
self._models[row["name"]] = CustomModel(
name=row["name"],
format=ModelFormat(row["format"]),
path=row["path"],
role=ModelRole(row["role"]),
context_window=row["context_window"],
description=row["description"],
registered_at=row["registered_at"],
active=bool(row["active"]),
default_temperature=row["default_temperature"],
max_tokens=row["max_tokens"],
)
for row in conn.execute("SELECT * FROM agent_model_assignments").fetchall():
self._agent_assignments[row["agent_id"]] = row["model_name"]
conn.close()
with closing(_get_conn()) as conn:
for row in conn.execute("SELECT * FROM custom_models WHERE active = 1").fetchall():
self._models[row["name"]] = CustomModel(
name=row["name"],
format=ModelFormat(row["format"]),
path=row["path"],
role=ModelRole(row["role"]),
context_window=row["context_window"],
description=row["description"],
registered_at=row["registered_at"],
active=bool(row["active"]),
default_temperature=row["default_temperature"],
max_tokens=row["max_tokens"],
)
for row in conn.execute("SELECT * FROM agent_model_assignments").fetchall():
self._agent_assignments[row["agent_id"]] = row["model_name"]
except Exception as exc:
logger.warning("Failed to load model registry from DB: %s", exc)
@@ -130,29 +130,28 @@ class ModelRegistry:
def register(self, model: CustomModel) -> CustomModel:
"""Register a new custom model."""
with self._lock:
conn = _get_conn()
conn.execute(
"""
INSERT OR REPLACE INTO custom_models
(name, format, path, role, context_window, description,
registered_at, active, default_temperature, max_tokens)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
model.name,
model.format.value,
model.path,
model.role.value,
model.context_window,
model.description,
model.registered_at,
int(model.active),
model.default_temperature,
model.max_tokens,
),
)
conn.commit()
conn.close()
with closing(_get_conn()) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO custom_models
(name, format, path, role, context_window, description,
registered_at, active, default_temperature, max_tokens)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
model.name,
model.format.value,
model.path,
model.role.value,
model.context_window,
model.description,
model.registered_at,
int(model.active),
model.default_temperature,
model.max_tokens,
),
)
conn.commit()
self._models[model.name] = model
logger.info("Registered model: %s (%s)", model.name, model.format.value)
return model
@@ -162,11 +161,10 @@ class ModelRegistry:
with self._lock:
if name not in self._models:
return False
conn = _get_conn()
conn.execute("DELETE FROM custom_models WHERE name = ?", (name,))
conn.execute("DELETE FROM agent_model_assignments WHERE model_name = ?", (name,))
conn.commit()
conn.close()
with closing(_get_conn()) as conn:
conn.execute("DELETE FROM custom_models WHERE name = ?", (name,))
conn.execute("DELETE FROM agent_model_assignments WHERE model_name = ?", (name,))
conn.commit()
del self._models[name]
# Remove any agent assignments using this model
self._agent_assignments = {
@@ -193,13 +191,12 @@ class ModelRegistry:
return False
with self._lock:
model.active = active
conn = _get_conn()
conn.execute(
"UPDATE custom_models SET active = ? WHERE name = ?",
(int(active), name),
)
conn.commit()
conn.close()
with closing(_get_conn()) as conn:
conn.execute(
"UPDATE custom_models SET active = ? WHERE name = ?",
(int(active), name),
)
conn.commit()
return True
# ── Agent-model assignments ────────────────────────────────────────────
@@ -210,17 +207,16 @@ class ModelRegistry:
return False
with self._lock:
now = datetime.now(UTC).isoformat()
conn = _get_conn()
conn.execute(
"""
INSERT OR REPLACE INTO agent_model_assignments
(agent_id, model_name, assigned_at)
VALUES (?, ?, ?)
""",
(agent_id, model_name, now),
)
conn.commit()
conn.close()
with closing(_get_conn()) as conn:
conn.execute(
"""
INSERT OR REPLACE INTO agent_model_assignments
(agent_id, model_name, assigned_at)
VALUES (?, ?, ?)
""",
(agent_id, model_name, now),
)
conn.commit()
self._agent_assignments[agent_id] = model_name
logger.info("Assigned model %s to agent %s", model_name, agent_id)
return True
@@ -230,13 +226,12 @@ class ModelRegistry:
with self._lock:
if agent_id not in self._agent_assignments:
return False
conn = _get_conn()
conn.execute(
"DELETE FROM agent_model_assignments WHERE agent_id = ?",
(agent_id,),
)
conn.commit()
conn.close()
with closing(_get_conn()) as conn:
conn.execute(
"DELETE FROM agent_model_assignments WHERE agent_id = ?",
(agent_id,),
)
conn.commit()
del self._agent_assignments[agent_id]
return True

View File

@@ -13,6 +13,7 @@ Default is always True. The owner changes this intentionally.
import sqlite3
import uuid
from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
@@ -96,80 +97,73 @@ def create_item(
created_at=datetime.now(UTC),
status="pending",
)
conn = _get_conn(db_path)
conn.execute(
"""
INSERT INTO approval_items
(id, title, description, proposed_action, impact, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
item.id,
item.title,
item.description,
item.proposed_action,
item.impact,
item.created_at.isoformat(),
item.status,
),
)
conn.commit()
conn.close()
with closing(_get_conn(db_path)) as conn:
conn.execute(
"""
INSERT INTO approval_items
(id, title, description, proposed_action, impact, created_at, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
item.id,
item.title,
item.description,
item.proposed_action,
item.impact,
item.created_at.isoformat(),
item.status,
),
)
conn.commit()
return item
def list_pending(db_path: Path = _DEFAULT_DB) -> list[ApprovalItem]:
"""Return all pending approval items, newest first."""
conn = _get_conn(db_path)
rows = conn.execute(
"SELECT * FROM approval_items WHERE status = 'pending' ORDER BY created_at DESC"
).fetchall()
conn.close()
with closing(_get_conn(db_path)) as conn:
rows = conn.execute(
"SELECT * FROM approval_items WHERE status = 'pending' ORDER BY created_at DESC"
).fetchall()
return [_row_to_item(r) for r in rows]
def list_all(db_path: Path = _DEFAULT_DB) -> list[ApprovalItem]:
"""Return all approval items regardless of status, newest first."""
conn = _get_conn(db_path)
rows = conn.execute("SELECT * FROM approval_items ORDER BY created_at DESC").fetchall()
conn.close()
with closing(_get_conn(db_path)) as conn:
rows = conn.execute("SELECT * FROM approval_items ORDER BY created_at DESC").fetchall()
return [_row_to_item(r) for r in rows]
def get_item(item_id: str, db_path: Path = _DEFAULT_DB) -> ApprovalItem | None:
conn = _get_conn(db_path)
row = conn.execute("SELECT * FROM approval_items WHERE id = ?", (item_id,)).fetchone()
conn.close()
with closing(_get_conn(db_path)) as conn:
row = conn.execute("SELECT * FROM approval_items WHERE id = ?", (item_id,)).fetchone()
return _row_to_item(row) if row else None
def approve(item_id: str, db_path: Path = _DEFAULT_DB) -> ApprovalItem | None:
"""Mark an approval item as approved."""
conn = _get_conn(db_path)
conn.execute("UPDATE approval_items SET status = 'approved' WHERE id = ?", (item_id,))
conn.commit()
conn.close()
with closing(_get_conn(db_path)) as conn:
conn.execute("UPDATE approval_items SET status = 'approved' WHERE id = ?", (item_id,))
conn.commit()
return get_item(item_id, db_path)
def reject(item_id: str, db_path: Path = _DEFAULT_DB) -> ApprovalItem | None:
"""Mark an approval item as rejected."""
conn = _get_conn(db_path)
conn.execute("UPDATE approval_items SET status = 'rejected' WHERE id = ?", (item_id,))
conn.commit()
conn.close()
with closing(_get_conn(db_path)) as conn:
conn.execute("UPDATE approval_items SET status = 'rejected' WHERE id = ?", (item_id,))
conn.commit()
return get_item(item_id, db_path)
def expire_old(db_path: Path = _DEFAULT_DB) -> int:
"""Auto-expire pending items older than EXPIRY_DAYS. Returns count removed."""
cutoff = (datetime.now(UTC) - timedelta(days=_EXPIRY_DAYS)).isoformat()
conn = _get_conn(db_path)
cursor = conn.execute(
"DELETE FROM approval_items WHERE status = 'pending' AND created_at < ?",
(cutoff,),
)
conn.commit()
count = cursor.rowcount
conn.close()
with closing(_get_conn(db_path)) as conn:
cursor = conn.execute(
"DELETE FROM approval_items WHERE status = 'pending' AND created_at < ?",
(cutoff,),
)
conn.commit()
count = cursor.rowcount
return count

View File

@@ -10,6 +10,7 @@ regenerates the briefing every 6 hours.
import logging
import sqlite3
from contextlib import closing
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
@@ -74,28 +75,26 @@ def _get_cache_conn(db_path: Path = _DEFAULT_DB) -> sqlite3.Connection:
def _save_briefing(briefing: Briefing, db_path: Path = _DEFAULT_DB) -> None:
conn = _get_cache_conn(db_path)
conn.execute(
"""
INSERT INTO briefings (generated_at, period_start, period_end, summary)
VALUES (?, ?, ?, ?)
""",
(
briefing.generated_at.isoformat(),
briefing.period_start.isoformat(),
briefing.period_end.isoformat(),
briefing.summary,
),
)
conn.commit()
conn.close()
with closing(_get_cache_conn(db_path)) as conn:
conn.execute(
"""
INSERT INTO briefings (generated_at, period_start, period_end, summary)
VALUES (?, ?, ?, ?)
""",
(
briefing.generated_at.isoformat(),
briefing.period_start.isoformat(),
briefing.period_end.isoformat(),
briefing.summary,
),
)
conn.commit()
def _load_latest(db_path: Path = _DEFAULT_DB) -> Briefing | None:
"""Load the most-recently cached briefing, or None if there is none."""
conn = _get_cache_conn(db_path)
row = conn.execute("SELECT * FROM briefings ORDER BY generated_at DESC LIMIT 1").fetchone()
conn.close()
with closing(_get_cache_conn(db_path)) as conn:
row = conn.execute("SELECT * FROM briefings ORDER BY generated_at DESC LIMIT 1").fetchone()
if row is None:
return None
return Briefing(
@@ -129,27 +128,25 @@ def _gather_swarm_summary(since: datetime) -> str:
return "No swarm activity recorded yet."
try:
conn = sqlite3.connect(str(swarm_db))
conn.row_factory = sqlite3.Row
with closing(sqlite3.connect(str(swarm_db))) as conn:
conn.row_factory = sqlite3.Row
since_iso = since.isoformat()
since_iso = since.isoformat()
completed = conn.execute(
"SELECT COUNT(*) as c FROM tasks WHERE status = 'completed' AND created_at > ?",
(since_iso,),
).fetchone()["c"]
completed = conn.execute(
"SELECT COUNT(*) as c FROM tasks WHERE status = 'completed' AND created_at > ?",
(since_iso,),
).fetchone()["c"]
failed = conn.execute(
"SELECT COUNT(*) as c FROM tasks WHERE status = 'failed' AND created_at > ?",
(since_iso,),
).fetchone()["c"]
failed = conn.execute(
"SELECT COUNT(*) as c FROM tasks WHERE status = 'failed' AND created_at > ?",
(since_iso,),
).fetchone()["c"]
agents = conn.execute(
"SELECT COUNT(*) as c FROM agents WHERE registered_at > ?",
(since_iso,),
).fetchone()["c"]
conn.close()
agents = conn.execute(
"SELECT COUNT(*) as c FROM agents WHERE registered_at > ?",
(since_iso,),
).fetchone()["c"]
parts = []
if completed:

View File

@@ -25,6 +25,7 @@ import os
import shutil
import sqlite3
import uuid
from contextlib import closing
from datetime import datetime
from pathlib import Path
@@ -163,37 +164,36 @@ def _bridge_to_work_order(title: str, body: str, category: str) -> None:
try:
db_path = Path(settings.repo_root) / "data" / "work_orders.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE IF NOT EXISTS work_orders (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority TEXT DEFAULT 'medium',
category TEXT DEFAULT 'suggestion',
submitter TEXT DEFAULT 'dashboard',
related_files TEXT DEFAULT '',
status TEXT DEFAULT 'submitted',
result TEXT DEFAULT '',
rejection_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)"""
)
conn.execute(
"INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid.uuid4()),
title,
body,
category,
"timmy-thinking",
datetime.utcnow().isoformat(),
),
)
conn.commit()
conn.close()
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute(
"""CREATE TABLE IF NOT EXISTS work_orders (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT DEFAULT '',
priority TEXT DEFAULT 'medium',
category TEXT DEFAULT 'suggestion',
submitter TEXT DEFAULT 'dashboard',
related_files TEXT DEFAULT '',
status TEXT DEFAULT 'submitted',
result TEXT DEFAULT '',
rejection_reason TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now')),
completed_at TEXT
)"""
)
conn.execute(
"INSERT INTO work_orders (id, title, description, category, submitter, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
str(uuid.uuid4()),
title,
body,
category,
"timmy-thinking",
datetime.utcnow().isoformat(),
),
)
conn.commit()
except Exception as exc:
logger.debug("Work order bridge failed: %s", exc)

View File

@@ -15,6 +15,7 @@ import hashlib
import json
import logging
import sqlite3
from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
@@ -113,20 +114,19 @@ class SemanticMemory:
def _init_db(self) -> None:
"""Initialize SQLite with vector storage."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self.db_path))
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
content TEXT NOT NULL,
embedding TEXT NOT NULL,
created_at TEXT NOT NULL,
source_hash TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
conn.commit()
conn.close()
with closing(sqlite3.connect(str(self.db_path))) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
content TEXT NOT NULL,
embedding TEXT NOT NULL,
created_at TEXT NOT NULL,
source_hash TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_chunks_source ON chunks(source)")
conn.commit()
def index_file(self, filepath: Path) -> int:
"""Index a single file into semantic memory."""
@@ -136,39 +136,37 @@ class SemanticMemory:
content = filepath.read_text()
file_hash = hashlib.md5(content.encode()).hexdigest()
# Check if already indexed with same hash
conn = sqlite3.connect(str(self.db_path))
cursor = conn.execute(
"SELECT source_hash FROM chunks WHERE source = ? LIMIT 1", (str(filepath),)
)
existing = cursor.fetchone()
if existing and existing[0] == file_hash:
conn.close()
return 0 # Already indexed
# Delete old chunks for this file
conn.execute("DELETE FROM chunks WHERE source = ?", (str(filepath),))
# Split into chunks (paragraphs)
chunks = self._split_into_chunks(content)
# Index each chunk
now = datetime.now(UTC).isoformat()
for i, chunk_text in enumerate(chunks):
if len(chunk_text.strip()) < 20: # Skip tiny chunks
continue
chunk_id = f"{filepath.stem}_{i}"
embedding = embed_text(chunk_text)
conn.execute(
"""INSERT INTO chunks (id, source, content, embedding, created_at, source_hash)
VALUES (?, ?, ?, ?, ?, ?)""",
(chunk_id, str(filepath), chunk_text, json.dumps(embedding), now, file_hash),
with closing(sqlite3.connect(str(self.db_path))) as conn:
# Check if already indexed with same hash
cursor = conn.execute(
"SELECT source_hash FROM chunks WHERE source = ? LIMIT 1", (str(filepath),)
)
existing = cursor.fetchone()
if existing and existing[0] == file_hash:
return 0 # Already indexed
conn.commit()
conn.close()
# Delete old chunks for this file
conn.execute("DELETE FROM chunks WHERE source = ?", (str(filepath),))
# Split into chunks (paragraphs)
chunks = self._split_into_chunks(content)
# Index each chunk
now = datetime.now(UTC).isoformat()
for i, chunk_text in enumerate(chunks):
if len(chunk_text.strip()) < 20: # Skip tiny chunks
continue
chunk_id = f"{filepath.stem}_{i}"
embedding = embed_text(chunk_text)
conn.execute(
"""INSERT INTO chunks (id, source, content, embedding, created_at, source_hash)
VALUES (?, ?, ?, ?, ?, ?)""",
(chunk_id, str(filepath), chunk_text, json.dumps(embedding), now, file_hash),
)
conn.commit()
logger.info("SemanticMemory: Indexed %s (%d chunks)", filepath.name, len(chunks))
return len(chunks)
@@ -222,13 +220,11 @@ class SemanticMemory:
"""Search for relevant memory chunks."""
query_embedding = embed_text(query)
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
with closing(sqlite3.connect(str(self.db_path))) as conn:
conn.row_factory = sqlite3.Row
# Get all chunks (in production, use vector index)
rows = conn.execute("SELECT source, content, embedding FROM chunks").fetchall()
conn.close()
# Get all chunks (in production, use vector index)
rows = conn.execute("SELECT source, content, embedding FROM chunks").fetchall()
# Calculate similarities
scored = []
@@ -268,10 +264,9 @@ class SemanticMemory:
def stats(self) -> dict:
"""Get indexing statistics."""
conn = sqlite3.connect(str(self.db_path))
cursor = conn.execute("SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks")
total_chunks, total_files = cursor.fetchone()
conn.close()
with closing(sqlite3.connect(str(self.db_path))) as conn:
cursor = conn.execute("SELECT COUNT(*), COUNT(DISTINCT source) FROM chunks")
total_chunks, total_files = cursor.fetchone()
return {
"total_chunks": total_chunks,

View File

@@ -21,6 +21,7 @@ import logging
import random
import sqlite3
import uuid
from contextlib import closing
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from difflib import SequenceMatcher
@@ -289,19 +290,17 @@ class ThinkingEngine:
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""
conn = _get_conn(self._db_path)
rows = conn.execute(
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
conn.close()
with closing(_get_conn(self._db_path)) as conn:
rows = conn.execute(
"SELECT * FROM thoughts ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_thought(r) for r in rows]
def get_thought(self, thought_id: str) -> Thought | None:
"""Retrieve a single thought by ID."""
conn = _get_conn(self._db_path)
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
conn.close()
with closing(_get_conn(self._db_path)) as conn:
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (thought_id,)).fetchone()
return _row_to_thought(row) if row else None
def get_thought_chain(self, thought_id: str, max_depth: int = 20) -> list[Thought]:
@@ -311,26 +310,24 @@ class ThinkingEngine:
"""
chain = []
current_id: str | None = thought_id
conn = _get_conn(self._db_path)
for _ in range(max_depth):
if not current_id:
break
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
if not row:
break
chain.append(_row_to_thought(row))
current_id = row["parent_id"]
with closing(_get_conn(self._db_path)) as conn:
for _ in range(max_depth):
if not current_id:
break
row = conn.execute("SELECT * FROM thoughts WHERE id = ?", (current_id,)).fetchone()
if not row:
break
chain.append(_row_to_thought(row))
current_id = row["parent_id"]
conn.close()
chain.reverse() # Chronological order
return chain
def count_thoughts(self) -> int:
"""Return total number of stored thoughts."""
conn = _get_conn(self._db_path)
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
conn.close()
with closing(_get_conn(self._db_path)) as conn:
count = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
return count
def prune_old_thoughts(self, keep_days: int = 90, keep_min: int = 200) -> int:
@@ -338,25 +335,23 @@ class ThinkingEngine:
Returns the number of deleted rows.
"""
conn = _get_conn(self._db_path)
try:
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
if total <= keep_min:
with closing(_get_conn(self._db_path)) as conn:
try:
total = conn.execute("SELECT COUNT(*) as c FROM thoughts").fetchone()["c"]
if total <= keep_min:
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = conn.execute(
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
conn.commit()
return deleted
except Exception as exc:
logger.warning("Thought pruning failed: %s", exc)
return 0
cutoff = (datetime.now(UTC) - timedelta(days=keep_days)).isoformat()
cursor = conn.execute(
"DELETE FROM thoughts WHERE created_at < ? AND id NOT IN "
"(SELECT id FROM thoughts ORDER BY created_at DESC LIMIT ?)",
(cutoff, keep_min),
)
deleted = cursor.rowcount
conn.commit()
return deleted
except Exception as exc:
logger.warning("Thought pruning failed: %s", exc)
return 0
finally:
conn.close()
# ── Private helpers ──────────────────────────────────────────────────
@@ -576,12 +571,11 @@ class ThinkingEngine:
# Thought count today (cheap DB query)
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
conn = _get_conn(self._db_path)
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
conn.close()
with closing(_get_conn(self._db_path)) as conn:
count = conn.execute(
"SELECT COUNT(*) as c FROM thoughts WHERE created_at >= ?",
(today_start.isoformat(),),
).fetchone()["c"]
parts.append(f"Thoughts today: {count}")
except Exception as exc:
logger.debug("Thought count query failed: %s", exc)
@@ -934,16 +928,21 @@ class ThinkingEngine:
created_at=datetime.now(UTC).isoformat(),
)
conn = _get_conn(self._db_path)
conn.execute(
"""
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(thought.id, thought.content, thought.seed_type, thought.parent_id, thought.created_at),
)
conn.commit()
conn.close()
with closing(_get_conn(self._db_path)) as conn:
conn.execute(
"""
INSERT INTO thoughts (id, content, seed_type, parent_id, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
thought.id,
thought.content,
thought.seed_type,
thought.parent_id,
thought.created_at,
),
)
conn.commit()
return thought
def _log_event(self, thought: Thought) -> None:

View File

@@ -6,7 +6,9 @@ being told about it in the system prompt.
import logging
import platform
import sqlite3
import sys
from contextlib import closing
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
@@ -174,19 +176,16 @@ def get_memory_status() -> dict[str, Any]:
# Tier 3: Semantic memory row count
tier3_info: dict[str, Any] = {"available": False}
try:
import sqlite3
sem_db = repo_root / "data" / "memory.db"
if sem_db.exists():
conn = sqlite3.connect(str(sem_db))
row = conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'"
).fetchone()
if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
tier3_info["available"] = True
tier3_info["vector_count"] = count[0] if count else 0
conn.close()
with closing(sqlite3.connect(str(sem_db))) as conn:
row = conn.execute(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='chunks'"
).fetchone()
if row and row[0]:
count = conn.execute("SELECT COUNT(*) FROM chunks").fetchone()
tier3_info["available"] = True
tier3_info["vector_count"] = count[0] if count else 0
except Exception as exc:
logger.debug("Memory status query failed: %s", exc)
pass
@@ -194,26 +193,23 @@ def get_memory_status() -> dict[str, Any]:
# Self-coding journal stats
journal_info: dict[str, Any] = {"available": False}
try:
import sqlite3 as _sqlite3
journal_db = repo_root / "data" / "self_coding.db"
if journal_db.exists():
conn = _sqlite3.connect(str(journal_db))
conn.row_factory = _sqlite3.Row
rows = conn.execute(
"SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome"
).fetchall()
if rows:
counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values())
journal_info = {
"available": True,
"total_attempts": total,
"successes": counts.get("success", 0),
"failures": counts.get("failure", 0),
"success_rate": round(counts.get("success", 0) / total, 2) if total else 0,
}
conn.close()
with closing(sqlite3.connect(str(journal_db))) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT outcome, COUNT(*) as cnt FROM modification_journal GROUP BY outcome"
).fetchall()
if rows:
counts = {r["outcome"]: r["cnt"] for r in rows}
total = sum(counts.values())
journal_info = {
"available": True,
"total_attempts": total,
"successes": counts.get("success", 0),
"failures": counts.get("failure", 0),
"success_rate": round(counts.get("success", 0) / total, 2) if total else 0,
}
except Exception as exc:
logger.debug("Journal stats query failed: %s", exc)
pass