Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
bee62403fd fix: add graceful degradation for DB errors in tasks.py
Some checks failed
Tests / lint (pull_request) Failing after 3s
Tests / test (pull_request) Has been skipped
Wrap all _get_db() calls with sqlite3.OperationalError handling so
endpoints degrade gracefully when the task DB is locked or missing:
- Read endpoints (page, partials, list, queue status) return empty data
- Write endpoints (create, update, delete) return 503

Fixes #943

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 19:03:02 -04:00
5 changed files with 230 additions and 318 deletions

View File

@@ -50,7 +50,6 @@ sounddevice = { version = ">=0.4.6", optional = true }
sentence-transformers = { version = ">=2.0.0", optional = true }
numpy = { version = ">=1.24.0", optional = true }
requests = { version = ">=2.31.0", optional = true }
trafilatura = { version = ">=1.6.0", optional = true }
GitPython = { version = ">=3.1.40", optional = true }
pytest = { version = ">=8.0.0", optional = true }
pytest-asyncio = { version = ">=0.24.0", optional = true }
@@ -68,7 +67,6 @@ voice = ["pyttsx3", "openai-whisper", "piper-tts", "sounddevice"]
celery = ["celery"]
embeddings = ["sentence-transformers", "numpy"]
git = ["GitPython"]
research = ["requests", "trafilatura"]
dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"]
[tool.poetry.group.dev.dependencies]

View File

@@ -104,25 +104,29 @@ class _TaskView:
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(request: Request):
"""Render the main task queue page with 3-column layout."""
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
try:
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
pending, active, completed = [], [], []
return templates.TemplateResponse(
request,
@@ -146,10 +150,14 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
try:
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -166,10 +174,14 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
try:
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -186,10 +198,14 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
try:
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return HTMLResponse('<div class="empty-column">Database unavailable</div>')
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
@@ -225,13 +241,17 @@ async def create_task_form(
now = datetime.now(UTC).isoformat()
priority = priority if priority in VALID_PRIORITIES else "normal"
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(task_id, title, description, priority, assigned_to, now),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
task = _TaskView(_row_to_dict(row))
return templates.TemplateResponse(request, "partials/task_card.html", {"task": task})
@@ -280,13 +300,17 @@ async def modify_task(
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",
(title, description, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -298,13 +322,17 @@ async def _set_status(request: Request, task_id: str, new_status: str):
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if not row:
raise HTTPException(404, "Task not found")
task = _TaskView(_row_to_dict(row))
@@ -330,22 +358,26 @@ async def api_create_task(request: Request):
if priority not in VALID_PRIORITIES:
priority = "normal"
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
task_id,
title,
body.get("description", ""),
priority,
body.get("assigned_to", ""),
body.get("created_by", "operator"),
now,
),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"INSERT INTO tasks (id, title, description, priority, assigned_to, created_by, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
task_id,
title,
body.get("description", ""),
priority,
body.get("assigned_to", ""),
body.get("created_by", "operator"),
now,
),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
return JSONResponse(_row_to_dict(row), status_code=201)
@@ -353,8 +385,12 @@ async def api_create_task(request: Request):
@router.get("/api/tasks", response_class=JSONResponse)
async def api_list_tasks():
"""List all tasks as JSON."""
with _get_db() as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
try:
with _get_db() as db:
rows = db.execute("SELECT * FROM tasks ORDER BY created_at DESC").fetchall()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return JSONResponse([], status_code=200)
return JSONResponse([_row_to_dict(r) for r in rows])
@@ -369,13 +405,17 @@ async def api_update_status(task_id: str, request: Request):
completed_at = (
datetime.now(UTC).isoformat() if new_status in ("completed", "vetoed", "failed") else None
)
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
try:
with _get_db() as db:
db.execute(
"UPDATE tasks SET status=?, completed_at=COALESCE(?, completed_at) WHERE id=?",
(new_status, completed_at, task_id),
)
db.commit()
row = db.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if not row:
raise HTTPException(404, "Task not found")
return JSONResponse(_row_to_dict(row))
@@ -384,9 +424,13 @@ async def api_update_status(task_id: str, request: Request):
@router.delete("/api/tasks/{task_id}", response_class=JSONResponse)
async def api_delete_task(task_id: str):
"""Delete a task."""
with _get_db() as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
try:
with _get_db() as db:
cursor = db.execute("DELETE FROM tasks WHERE id=?", (task_id,))
db.commit()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
raise HTTPException(status_code=503, detail="Task database unavailable") from exc
if cursor.rowcount == 0:
raise HTTPException(404, "Task not found")
return JSONResponse({"success": True, "id": task_id})
@@ -400,15 +444,19 @@ async def api_delete_task(task_id: str):
@router.get("/api/queue/status", response_class=JSONResponse)
async def queue_status(assigned_to: str = "default"):
"""Return queue status for the chat panel's agent status indicator."""
with _get_db() as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
).fetchone()
ahead = db.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
try:
with _get_db() as db:
running = db.execute(
"SELECT * FROM tasks WHERE status='running' AND assigned_to=? LIMIT 1",
(assigned_to,),
).fetchone()
ahead = db.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved') AND assigned_to=?",
(assigned_to,),
).fetchone()
except sqlite3.OperationalError as exc:
logger.warning("Task DB unavailable: %s", exc)
return JSONResponse({"is_working": False, "current_task": None, "tasks_ahead": 0})
if running:
return JSONResponse(

View File

@@ -473,69 +473,6 @@ def consult_grok(query: str) -> str:
return response
def web_fetch(url: str, max_tokens: int = 4000) -> str:
"""Fetch a web page and return its main text content.
Downloads the URL, extracts readable text using trafilatura, and
truncates to a token budget. Use this to read full articles, docs,
or blog posts that web_search only returns snippets for.
Args:
url: The URL to fetch (must start with http:// or https://).
max_tokens: Maximum approximate token budget (default 4000).
Text is truncated to max_tokens * 4 characters.
Returns:
Extracted text content, or an error message on failure.
"""
if not url or not url.startswith(("http://", "https://")):
return f"Error: invalid URL — must start with http:// or https://: {url!r}"
try:
import requests as _requests
except ImportError:
return "Error: 'requests' package is not installed. Install with: pip install requests"
try:
import trafilatura
except ImportError:
return (
"Error: 'trafilatura' package is not installed. Install with: pip install trafilatura"
)
try:
resp = _requests.get(
url,
timeout=15,
headers={"User-Agent": "TimmyResearchBot/1.0"},
)
resp.raise_for_status()
except _requests.exceptions.Timeout:
return f"Error: request timed out after 15 seconds for {url}"
except _requests.exceptions.HTTPError as exc:
return f"Error: HTTP {exc.response.status_code} for {url}"
except _requests.exceptions.RequestException as exc:
return f"Error: failed to fetch {url}{exc}"
text = trafilatura.extract(resp.text, include_tables=True, include_links=True)
if not text:
return f"Error: could not extract readable content from {url}"
char_budget = max_tokens * 4
if len(text) > char_budget:
text = text[:char_budget] + f"\n\n[…truncated to ~{max_tokens} tokens]"
return text
def _register_web_fetch_tool(toolkit: Toolkit) -> None:
"""Register the web_fetch tool for full-page content extraction."""
try:
toolkit.register(web_fetch, name="web_fetch")
except Exception as exc:
logger.warning("Tool execution failed (web_fetch registration): %s", exc)
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
"""Register core execution and file tools."""
# Python execution
@@ -735,7 +672,6 @@ def create_full_toolkit(base_dir: str | Path | None = None):
base_path = Path(base_dir) if base_dir else Path(settings.repo_root)
_register_core_tools(toolkit, base_path)
_register_web_fetch_tool(toolkit)
_register_grok_tool(toolkit)
_register_memory_tools(toolkit)
_register_agentic_loop_tool(toolkit)
@@ -893,11 +829,6 @@ def _analysis_tool_catalog() -> dict:
"description": "Evaluate mathematical expressions with exact results",
"available_in": ["orchestrator"],
},
"web_fetch": {
"name": "Web Fetch",
"description": "Fetch a web page and extract clean readable text (trafilatura)",
"available_in": ["orchestrator"],
},
}

View File

@@ -3,6 +3,99 @@
Verifies task CRUD operations and the dashboard page rendering.
"""
import sqlite3
from unittest.mock import patch
# ---------------------------------------------------------------------------
# DB error handling tests
# ---------------------------------------------------------------------------
_DB_ERROR = sqlite3.OperationalError("database is locked")
def test_tasks_page_degrades_on_db_error(client):
"""GET /tasks renders empty columns when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks")
assert response.status_code == 200
assert "TASK QUEUE" in response.text
def test_pending_partial_degrades_on_db_error(client):
"""GET /tasks/pending returns fallback HTML when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks/pending")
assert response.status_code == 200
assert "Database unavailable" in response.text
def test_active_partial_degrades_on_db_error(client):
"""GET /tasks/active returns fallback HTML when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks/active")
assert response.status_code == 200
assert "Database unavailable" in response.text
def test_completed_partial_degrades_on_db_error(client):
"""GET /tasks/completed returns fallback HTML when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/tasks/completed")
assert response.status_code == 200
assert "Database unavailable" in response.text
def test_api_create_task_503_on_db_error(client):
"""POST /api/tasks returns 503 when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.post("/api/tasks", json={"title": "Test"})
assert response.status_code == 503
def test_api_list_tasks_empty_on_db_error(client):
"""GET /api/tasks returns empty list when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/api/tasks")
assert response.status_code == 200
assert response.json() == []
def test_queue_status_degrades_on_db_error(client):
"""GET /api/queue/status returns idle status when DB is unavailable."""
with patch(
"dashboard.routes.tasks._get_db",
side_effect=_DB_ERROR,
):
response = client.get("/api/queue/status")
assert response.status_code == 200
data = response.json()
assert data["is_working"] is False
assert data["current_task"] is None
# ---------------------------------------------------------------------------
# Existing tests
# ---------------------------------------------------------------------------
def test_tasks_page_returns_200(client):
response = client.get("/tasks")

View File

@@ -1,158 +0,0 @@
"""Unit tests for the web_fetch tool in timmy.tools."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from timmy.tools import web_fetch
class TestWebFetch:
"""Tests for web_fetch function."""
def test_invalid_url_no_scheme(self):
"""URLs without http(s) scheme are rejected."""
result = web_fetch("example.com")
assert "Error: invalid URL" in result
def test_invalid_url_empty(self):
"""Empty URL is rejected."""
result = web_fetch("")
assert "Error: invalid URL" in result
def test_invalid_url_ftp(self):
"""Non-HTTP schemes are rejected."""
result = web_fetch("ftp://example.com")
assert "Error: invalid URL" in result
@patch("timmy.tools.trafilatura", create=True)
@patch("timmy.tools._requests", create=True)
def test_successful_fetch(self, mock_requests, mock_trafilatura):
"""Happy path: fetch + extract returns text."""
# We need to patch at import level inside the function
mock_resp = MagicMock()
mock_resp.text = "<html><body><p>Hello world</p></body></html>"
with patch.dict(
"sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}
):
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_exceptions()
mock_trafilatura.extract.return_value = "Hello world"
result = web_fetch("https://example.com")
assert result == "Hello world"
@patch.dict("sys.modules", {"requests": MagicMock(), "trafilatura": MagicMock()})
def test_truncation(self):
"""Long text is truncated to max_tokens * 4 chars."""
import sys
mock_trafilatura = sys.modules["trafilatura"]
mock_requests = sys.modules["requests"]
long_text = "a" * 20000
mock_resp = MagicMock()
mock_resp.text = "<html><body>" + long_text + "</body></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_exceptions()
mock_trafilatura.extract.return_value = long_text
result = web_fetch("https://example.com", max_tokens=100)
# 100 tokens * 4 chars = 400 chars max
assert len(result) < 500
assert "[…truncated" in result
@patch.dict("sys.modules", {"requests": MagicMock(), "trafilatura": MagicMock()})
def test_extraction_failure(self):
"""Returns error when trafilatura can't extract text."""
import sys
mock_trafilatura = sys.modules["trafilatura"]
mock_requests = sys.modules["requests"]
mock_resp = MagicMock()
mock_resp.text = "<html></html>"
mock_requests.get.return_value = mock_resp
mock_requests.exceptions = _make_exceptions()
mock_trafilatura.extract.return_value = None
result = web_fetch("https://example.com")
assert "Error: could not extract" in result
@patch.dict("sys.modules", {"trafilatura": MagicMock()})
def test_timeout(self):
"""Timeout errors are handled gracefully."""
mock_requests = MagicMock()
exc_mod = _make_exceptions()
mock_requests.exceptions = exc_mod
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
with patch.dict("sys.modules", {"requests": mock_requests}):
result = web_fetch("https://example.com")
assert "timed out" in result
@patch.dict("sys.modules", {"trafilatura": MagicMock()})
def test_http_error(self):
"""HTTP errors (404, 500, etc.) are handled gracefully."""
mock_requests = MagicMock()
exc_mod = _make_exceptions()
mock_requests.exceptions = exc_mod
mock_response = MagicMock()
mock_response.status_code = 404
mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError(
response=mock_response
)
with patch.dict("sys.modules", {"requests": mock_requests}):
result = web_fetch("https://example.com/nope")
assert "404" in result
def test_missing_requests(self):
"""Graceful error when requests not installed."""
with patch.dict("sys.modules", {"requests": None}):
result = web_fetch("https://example.com")
assert "requests" in result and "not installed" in result
def test_missing_trafilatura(self):
"""Graceful error when trafilatura not installed."""
mock_requests = MagicMock()
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}):
result = web_fetch("https://example.com")
assert "trafilatura" in result and "not installed" in result
def test_catalog_entry_exists(self):
"""web_fetch should appear in the tool catalog."""
from timmy.tools import get_all_available_tools
catalog = get_all_available_tools()
assert "web_fetch" in catalog
assert "orchestrator" in catalog["web_fetch"]["available_in"]
def _make_exceptions():
"""Create a mock exceptions module with real exception classes."""
class Timeout(Exception):
pass
class HTTPError(Exception):
def __init__(self, *args, response=None, **kwargs):
super().__init__(*args, **kwargs)
self.response = response
class RequestException(Exception):
pass
mod = MagicMock()
mod.Timeout = Timeout
mod.HTTPError = HTTPError
mod.RequestException = RequestException
return mod